diff --git a/cli/tests/integration/async_io.rs b/cli/tests/integration/async_io.rs
index 785fab50..bf1c1298 100644
--- a/cli/tests/integration/async_io.rs
+++ b/cli/tests/integration/async_io.rs
@@ -1,12 +1,9 @@
-use crate::common::Test;
-use crate::common::TestResult;
-use hyper::Body;
-use hyper::Request;
-use hyper::Response;
-use hyper::StatusCode;
-use std::sync::atomic::AtomicUsize;
-use std::sync::atomic::Ordering;
-use std::sync::Arc;
+use crate::common::{Test, TestResult};
+use hyper::{body::HttpBody, Body, Request, Response, StatusCode};
+use std::sync::{
+ atomic::{AtomicUsize, Ordering},
+ Arc,
+};
use tokio::sync::Barrier;
// On Windows, streaming body backpressure doesn't seem to work as expected, either
@@ -22,11 +19,20 @@ async fn async_io_methods() -> TestResult {
let req_count_1 = request_count.clone();
let req_count_2 = request_count.clone();
let req_count_3 = request_count.clone();
+ let req_count_4 = request_count.clone();
let barrier = Arc::new(Barrier::new(3));
let barrier_1 = barrier.clone();
let barrier_2 = barrier.clone();
+ let sync_barrier = Arc::new(Barrier::new(2));
+ let sync_barrier_1 = sync_barrier.clone();
+ // We set up 4 async backends below, configured to test different
+ // combinations of async behavior from the guest. The first three backends
+ // are things we are actually testing, and the fourth ("Semaphore") is just
+ // used as a synchronization mechanism. Each backend will receive 4 requests
+ // total and will behave differently depending on which request # it is
+ // processing.
let test = Test::using_fixture("async_io.wasm")
.async_backend("Simple", "/", None, move |req: Request
| {
assert_eq!(req.headers()["Host"], "simple.org");
@@ -34,25 +40,11 @@ async fn async_io_methods() -> TestResult {
let barrier_1 = barrier_1.clone();
Box::new(async move {
match req_count_1.load(Ordering::Relaxed) {
- 0 => {
- barrier_1.wait().await;
- Response::builder()
- .status(StatusCode::OK)
- .body(Body::empty())
- .unwrap()
- }
1 => Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap(),
- 2 => {
- barrier_1.wait().await;
- Response::builder()
- .status(StatusCode::OK)
- .body(Body::empty())
- .unwrap()
- }
- 3 => {
+ 0 | 2 | 3 => {
barrier_1.wait().await;
Response::builder()
.status(StatusCode::OK)
@@ -69,21 +61,11 @@ async fn async_io_methods() -> TestResult {
let req_count_2 = req_count_2.clone();
Box::new(async move {
match req_count_2.load(Ordering::Relaxed) {
- 0 => Response::builder()
- .header("Transfer-Encoding", "chunked")
- .status(StatusCode::OK)
- .body(Body::empty())
- .unwrap(),
- 1 => Response::builder()
- .header("Transfer-Encoding", "chunked")
- .status(StatusCode::OK)
- .body(Body::empty())
- .unwrap(),
2 => Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap(),
- 3 => Response::builder()
+ 0 | 1 | 3 => Response::builder()
.header("Transfer-Encoding", "chunked")
.status(StatusCode::OK)
.body(Body::empty())
@@ -97,36 +79,63 @@ async fn async_io_methods() -> TestResult {
assert_eq!(req.headers()["Host"], "writebody.org");
let req_count_3 = req_count_3.clone();
let barrier_2 = barrier_2.clone();
+ let sync_barrier = sync_barrier.clone();
Box::new(async move {
match req_count_3.load(Ordering::Relaxed) {
- 0 => {
- barrier_2.wait().await;
- Response::builder()
- .status(StatusCode::OK)
- .body(Body::empty())
- .unwrap()
- }
- 1 => {
- barrier_2.wait().await;
+ 3 => {
+ // Read at least 4MB and one 8K chunk from the request
+ // to relieve back-pressure for the guest. These numbers
+ // come from the amount of data that the guest writes to
+ // the request body in test-fixtures/src/bin/async_io.rs
+ let mut bod = req.into_body();
+ let mut bytes_read = 0;
+ while bytes_read < (4 * 1024 * 1024) + (8 * 1024) {
+ if let Some(Ok(bytes)) = bod.data().await {
+ bytes_read += bytes.len();
+ }
+ }
+
+ // The guest will have another outstanding request to
+ // the Semaphore backend below. Awaiting on the barrier
+ // here will cause that request to return indicating to
+ // the guest that we have read from the request body
+ // and the write handle should be ready again.
+ sync_barrier.wait().await;
+ let _body = hyper::body::to_bytes(bod);
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
- 2 => {
+ 0 | 1 | 2 => {
barrier_2.wait().await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
+ _ => unreachable!(),
+ }
+ })
+ })
+ .await
+ .async_backend("Semaphore", "/", None, move |req: Request| {
+ assert_eq!(req.headers()["Host"], "writebody.org");
+ let req_count_4 = req_count_4.clone();
+ let sync_barrier_1 = sync_barrier_1.clone();
+ Box::new(async move {
+ match req_count_4.load(Ordering::Relaxed) {
3 => {
- let _body = hyper::body::to_bytes(req.into_body()).await;
+ sync_barrier_1.wait().await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
+ 0 | 1 | 2 => Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::empty())
+ .unwrap(),
_ => unreachable!(),
}
})
diff --git a/lib/src/execute.rs b/lib/src/execute.rs
index 865ef64b..44ab83d2 100644
--- a/lib/src/execute.rs
+++ b/lib/src/execute.rs
@@ -1,7 +1,5 @@
//! Guest code execution.
-use std::net::Ipv4Addr;
-
use {
crate::{
body::Body,
@@ -18,11 +16,12 @@ use {
hyper::{Request, Response},
std::{
collections::HashSet,
- net::IpAddr,
+ net::{IpAddr, Ipv4Addr},
path::{Path, PathBuf},
- sync::atomic::AtomicU64,
+ sync::atomic::{AtomicBool, AtomicU64, Ordering},
sync::Arc,
- time::Instant,
+ thread::{self, JoinHandle},
+ time::{Duration, Instant},
},
tokio::sync::oneshot::{self, Sender},
tracing::{event, info, info_span, Instrument, Level},
@@ -61,6 +60,9 @@ pub struct ExecuteCtx {
object_store: Arc,
/// The secret stores for this execution.
secret_stores: Arc,
+ // `Arc` for the two fields below because this struct must be `Clone`.
+ epoch_increment_thread: Option>>,
+ epoch_increment_stop: Arc,
}
impl ExecuteCtx {
@@ -77,6 +79,18 @@ impl ExecuteCtx {
let module = Module::from_file(&engine, module_path)?;
let instance_pre = linker.instantiate_pre(&module)?;
+ // Create the epoch-increment thread.
+ let epoch_interruption_period = Duration::from_micros(50);
+ let epoch_increment_stop = Arc::new(AtomicBool::new(false));
+ let engine_clone = engine.clone();
+ let epoch_increment_stop_clone = epoch_increment_stop.clone();
+ let epoch_increment_thread = Some(Arc::new(thread::spawn(move || {
+ while !epoch_increment_stop_clone.load(Ordering::Relaxed) {
+ thread::sleep(epoch_interruption_period);
+ engine_clone.increment_epoch();
+ }
+ })));
+
Ok(Self {
engine,
instance_pre: Arc::new(instance_pre),
@@ -90,6 +104,8 @@ impl ExecuteCtx {
next_req_id: Arc::new(AtomicU64::new(0)),
object_store: Arc::new(ObjectStores::new()),
secret_stores: Arc::new(SecretStores::new()),
+ epoch_increment_thread,
+ epoch_increment_stop,
})
}
@@ -104,11 +120,9 @@ impl ExecuteCtx {
}
/// Set the backends for this execution context.
- pub fn with_backends(self, backends: Backends) -> Self {
- Self {
- backends: Arc::new(backends),
- ..self
- }
+ pub fn with_backends(mut self, backends: Backends) -> Self {
+ self.backends = Arc::new(backends);
+ self
}
/// Get the geolocation mappings for this execution context.
@@ -117,11 +131,9 @@ impl ExecuteCtx {
}
/// Set the geolocation mappings for this execution context.
- pub fn with_geolocation(self, geolocation: Geolocation) -> Self {
- Self {
- geolocation: Arc::new(geolocation),
- ..self
- }
+ pub fn with_geolocation(mut self, geolocation: Geolocation) -> Self {
+ self.geolocation = Arc::new(geolocation);
+ self
}
/// Get the dictionaries for this execution context.
@@ -130,35 +142,27 @@ impl ExecuteCtx {
}
/// Set the dictionaries for this execution context.
- pub fn with_dictionaries(self, dictionaries: Dictionaries) -> Self {
- Self {
- dictionaries: Arc::new(dictionaries),
- ..self
- }
+ pub fn with_dictionaries(mut self, dictionaries: Dictionaries) -> Self {
+ self.dictionaries = Arc::new(dictionaries);
+ self
}
/// Set the object store for this execution context.
- pub fn with_object_stores(self, object_store: ObjectStores) -> Self {
- Self {
- object_store: Arc::new(object_store),
- ..self
- }
+ pub fn with_object_stores(mut self, object_store: ObjectStores) -> Self {
+ self.object_store = Arc::new(object_store);
+ self
}
/// Set the secret stores for this execution context.
- pub fn with_secret_stores(self, secret_stores: SecretStores) -> Self {
- Self {
- secret_stores: Arc::new(secret_stores),
- ..self
- }
+ pub fn with_secret_stores(mut self, secret_stores: SecretStores) -> Self {
+ self.secret_stores = Arc::new(secret_stores);
+ self
}
/// Set the path to the config for this execution context.
- pub fn with_config_path(self, config_path: PathBuf) -> Self {
- Self {
- config_path: Arc::new(Some(config_path)),
- ..self
- }
+ pub fn with_config_path(mut self, config_path: PathBuf) -> Self {
+ self.config_path = Arc::new(Some(config_path));
+ self
}
/// Whether to treat stdout as a logging endpoint.
@@ -167,8 +171,9 @@ impl ExecuteCtx {
}
/// Set the stdout logging policy for this execution context.
- pub fn with_log_stdout(self, log_stdout: bool) -> Self {
- Self { log_stdout, ..self }
+ pub fn with_log_stdout(mut self, log_stdout: bool) -> Self {
+ self.log_stdout = log_stdout;
+ self
}
/// Whether to treat stderr as a logging endpoint.
@@ -177,8 +182,9 @@ impl ExecuteCtx {
}
/// Set the stderr logging policy for this execution context.
- pub fn with_log_stderr(self, log_stderr: bool) -> Self {
- Self { log_stderr, ..self }
+ pub fn with_log_stderr(mut self, log_stderr: bool) -> Self {
+ self.log_stderr = log_stderr;
+ self
}
/// Gets the TLS configuration
@@ -410,6 +416,17 @@ impl ExecuteCtx {
}
}
+impl Drop for ExecuteCtx {
+ fn drop(&mut self) {
+ if let Some(arc) = self.epoch_increment_thread.take() {
+ if let Ok(join_handle) = Arc::try_unwrap(arc) {
+ self.epoch_increment_stop.store(true, Ordering::Relaxed);
+ join_handle.join().unwrap();
+ }
+ }
+ }
+}
+
fn configure_wasmtime(profiling_strategy: ProfilingStrategy) -> wasmtime::Config {
use wasmtime::{
Config, InstanceAllocationStrategy, PoolingAllocationConfig, WasmBacktraceDetails,
@@ -419,7 +436,7 @@ fn configure_wasmtime(profiling_strategy: ProfilingStrategy) -> wasmtime::Config
config.debug_info(false); // Keep this disabled - wasmtime will hang if enabled
config.wasm_backtrace_details(WasmBacktraceDetails::Enable);
config.async_support(true);
- config.consume_fuel(true);
+ config.epoch_interruption(true);
config.profiler(profiling_strategy);
const MB: usize = 1 << 20;
diff --git a/lib/src/linking.rs b/lib/src/linking.rs
index dfc61eac..2fff627f 100644
--- a/lib/src/linking.rs
+++ b/lib/src/linking.rs
@@ -55,7 +55,8 @@ pub(crate) fn create_store(
session,
};
let mut store = Store::new(ctx.engine(), wasm_ctx);
- store.out_of_fuel_async_yield(u64::MAX, 10000);
+ store.set_epoch_deadline(1);
+ store.epoch_deadline_async_yield_and_update(1);
Ok(store)
}
diff --git a/test-fixtures/src/bin/async_io.rs b/test-fixtures/src/bin/async_io.rs
index db38dbd4..7af400be 100644
--- a/test-fixtures/src/bin/async_io.rs
+++ b/test-fixtures/src/bin/async_io.rs
@@ -8,6 +8,7 @@ use std::str::FromStr;
use fastly::handle::{BodyHandle, RequestHandle, ResponseHandle};
use fastly::http::{HeaderName, HeaderValue, Method, StatusCode, Url};
use fastly::Error;
+use fastly::Request;
use fastly_shared::{CacheOverride, FastlyStatus};
fn is_ready(handle: u32) -> bool {
@@ -45,6 +46,12 @@ fn test_select() -> Result<(), Error> {
let (_read_body_resp, read_body) = read_body_req.send(BodyHandle::new(), "ReadBody")?;
let read_body_handle = unsafe { read_body.as_u32() };
+ // This request is used as a synchronization mechanism for the purposes of
+ // this test
+ let write_body_sync_req = Request::get("http://writebody.org/")
+ .send_async("Semaphore")
+ .expect("request begins sending");
+
// The "write body" case involves a `send_async_streaming` call, where the streaming body is the
// async item of interest. To test readiness, we need to ensure the body is large enough that Hyper
// won't try to buffer it, and hence we can see backpressure on streaming. We do this by including
@@ -74,13 +81,12 @@ fn test_select() -> Result<(), Error> {
assert!(nwritten > 0);
}
- // Give the servers a chance to do their thing. This is needed to resolve a race between the servers
- // initiating responses / reading buffers and the guest snapshotting readiness or performing `select`
- //
- // If this ever becomes flaky, an alternative would be to introduce an additional backend that
- // responds only when the other backends have reached their steady state, via a VTC semaphore, and
- // block on THAT backend here.
- std::thread::sleep(std::time::Duration::from_millis(100));
+ // We wait on this request here to give the servers a chance to do their
+ // thing. This is needed to resolve a race between the servers initiating
+ // responses / reading buffers and the guest snapshotting readiness or
+ // performing `select`. This request should return when the other backends
+ // have reached their steady state
+ write_body_sync_req.wait()?;
append_header(
&mut ds_resp,