Skip to content

Commit

Permalink
feat: optional async ops
Browse files Browse the repository at this point in the history
  • Loading branch information
kt3k committed Jan 19, 2020
1 parent 34b99fe commit 1d2fb20
Show file tree
Hide file tree
Showing 16 changed files with 105 additions and 46 deletions.
32 changes: 19 additions & 13 deletions cli/ops/compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ fn op_fetch_source_files(
Ok(v.into())
});

Ok(JsonOp::Async(future))
Ok(JsonOp::Async(future, true))
}

#[derive(Deserialize, Debug)]
Expand All @@ -166,13 +166,16 @@ fn op_compile(
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: CompileArgs = serde_json::from_value(args)?;
Ok(JsonOp::Async(runtime_compile_async(
state.global_state.clone(),
&args.root_name,
&args.sources,
args.bundle,
&args.options,
)))
Ok(JsonOp::Async(
runtime_compile_async(
state.global_state.clone(),
&args.root_name,
&args.sources,
args.bundle,
&args.options,
),
true,
))
}

#[derive(Deserialize, Debug)]
Expand All @@ -187,9 +190,12 @@ fn op_transpile(
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: TranspileArgs = serde_json::from_value(args)?;
Ok(JsonOp::Async(runtime_transpile_async(
state.global_state.clone(),
&args.sources,
&args.options,
)))
Ok(JsonOp::Async(
runtime_transpile_async(
state.global_state.clone(),
&args.sources,
&args.options,
),
true,
))
}
11 changes: 6 additions & 5 deletions cli/ops/dispatch_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ pub type AsyncJsonOp =

pub enum JsonOp {
Sync(Value),
Async(AsyncJsonOp),
/** The 2nd element is true when the op blocks exiting, false otherwise. */
Async(AsyncJsonOp, bool),
}

fn json_err(err: ErrBox) -> Value {
Expand Down Expand Up @@ -70,19 +71,19 @@ where
assert!(promise_id.is_none());
CoreOp::Sync(serialize_result(promise_id, Ok(sync_value)))
}
Ok(JsonOp::Async(fut)) => {
Ok(JsonOp::Async(fut, blocks_exit)) => {
assert!(promise_id.is_some());
let fut2 = fut.then(move |result| {
futures::future::ok(serialize_result(promise_id, result))
});
CoreOp::Async(fut2.boxed())
CoreOp::Async(fut2.boxed(), blocks_exit)
}
Err(sync_err) => {
let buf = serialize_result(promise_id, Err(sync_err));
if is_sync {
CoreOp::Sync(buf)
} else {
CoreOp::Async(futures::future::ok(buf).boxed())
CoreOp::Async(futures::future::ok(buf).boxed(), true)
}
}
}
Expand All @@ -101,6 +102,6 @@ where
let handle = pool
.spawn_with_handle(futures::future::lazy(move |_cx| f()))
.unwrap();
Ok(JsonOp::Async(handle.boxed()))
Ok(JsonOp::Async(handle.boxed(), true))
}
}
2 changes: 1 addition & 1 deletion cli/ops/dispatch_minimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ where
// works since they're simple polling futures.
Op::Sync(futures::executor::block_on(fut).unwrap())
} else {
Op::Async(fut.boxed())
Op::Async(fut.boxed(), true)
}
}
}
2 changes: 1 addition & 1 deletion cli/ops/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,5 @@ pub fn op_fetch(
Ok(json_res)
};

Ok(JsonOp::Async(future.boxed()))
Ok(JsonOp::Async(future.boxed(), true))
}
4 changes: 2 additions & 2 deletions cli/ops/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ fn op_open(
let buf = futures::executor::block_on(fut)?;
Ok(JsonOp::Sync(buf))
} else {
Ok(JsonOp::Async(fut.boxed()))
Ok(JsonOp::Async(fut.boxed(), true))
}
}

Expand Down Expand Up @@ -171,6 +171,6 @@ fn op_seek(
let buf = futures::executor::block_on(fut)?;
Ok(JsonOp::Sync(buf))
} else {
Ok(JsonOp::Async(fut.boxed()))
Ok(JsonOp::Async(fut.boxed(), true))
}
}
4 changes: 2 additions & 2 deletions cli/ops/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ fn op_accept(
}))
};

Ok(JsonOp::Async(op.boxed()))
Ok(JsonOp::Async(op.boxed(), true))
}

#[derive(Deserialize)]
Expand Down Expand Up @@ -157,7 +157,7 @@ fn op_connect(
}))
};

Ok(JsonOp::Async(op.boxed()))
Ok(JsonOp::Async(op.boxed(), true))
}

#[derive(Deserialize)]
Expand Down
2 changes: 1 addition & 1 deletion cli/ops/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ fn op_run_status(
let pool = futures::executor::ThreadPool::new().unwrap();
let handle = pool.spawn_with_handle(future).unwrap();

Ok(JsonOp::Async(handle.boxed()))
Ok(JsonOp::Async(handle.boxed(), true))
}

#[derive(Deserialize)]
Expand Down
2 changes: 1 addition & 1 deletion cli/ops/timers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ fn op_global_timer(
.new_timeout(deadline)
.then(move |_| futures::future::ok(json!({})));

Ok(JsonOp::Async(f.boxed()))
Ok(JsonOp::Async(f.boxed(), true))
}

// Returns a milliseconds and nanoseconds subsec
Expand Down
4 changes: 2 additions & 2 deletions cli/ops/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ pub fn op_connect_tls(
}))
};

Ok(JsonOp::Async(op.boxed()))
Ok(JsonOp::Async(op.boxed(), true))
}

fn load_certs(path: &str) -> Result<Vec<Certificate>, ErrBox> {
Expand Down Expand Up @@ -376,5 +376,5 @@ fn op_accept_tls(
}))
};

Ok(JsonOp::Async(op.boxed()))
Ok(JsonOp::Async(op.boxed(), true))
}
8 changes: 4 additions & 4 deletions cli/ops/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ fn op_worker_get_message(
Ok(json!({ "data": maybe_buf }))
};

Ok(JsonOp::Async(op.boxed()))
Ok(JsonOp::Async(op.boxed(), true))
}

/// Post message to host as guest worker
Expand Down Expand Up @@ -258,7 +258,7 @@ fn op_host_get_worker_loaded(
Ok(serialize_worker_result(result))
};

Ok(JsonOp::Async(op.boxed()))
Ok(JsonOp::Async(op.boxed(), true))
}

fn op_host_poll_worker(
Expand Down Expand Up @@ -286,7 +286,7 @@ fn op_host_poll_worker(

Ok(serialize_worker_result(result))
};
Ok(JsonOp::Async(op.boxed()))
Ok(JsonOp::Async(op.boxed(), true))
}

fn op_host_close_worker(
Expand Down Expand Up @@ -348,7 +348,7 @@ fn op_host_get_message(
Ok(json!({ "data": maybe_buf }))
};

Ok(JsonOp::Async(op.boxed()))
Ok(JsonOp::Async(op.boxed(), true))
}

#[derive(Deserialize)]
Expand Down
4 changes: 2 additions & 2 deletions cli/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,13 @@ impl ThreadSafeState {
state.metrics_op_completed(buf.len());
Op::Sync(buf)
}
Op::Async(fut) => {
Op::Async(fut, blocks_exit) => {
let state = state.clone();
let result_fut = fut.map_ok(move |buf: Buf| {
state.metrics_op_completed(buf.len());
buf
});
Op::Async(result_fut.boxed())
Op::Async(result_fut.boxed(), blocks_exit)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/es_isolate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ pub mod tests {
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
Op::Async(futures::future::ok(buf).boxed())
Op::Async(futures::future::ok(buf).boxed(), true)
};

isolate.register_op("test", dispatcher);
Expand Down
2 changes: 1 addition & 1 deletion core/examples/http_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ fn http_op(
if is_sync {
Op::Sync(futures::executor::block_on(fut).unwrap())
} else {
Op::Async(fut.boxed())
Op::Async(fut.boxed(), true)
}
}
}
Expand Down
53 changes: 46 additions & 7 deletions core/isolate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,9 +511,11 @@ impl Isolate {
let op_id = 0;
Some((op_id, buf))
}
Op::Async(fut) => {
Op::Async(fut, blocks_exit) => {
let fut2 = fut.map_ok(move |buf| (op_id, buf));
self.pending_ops.push(fut2.boxed());
self
.pending_ops
.push(Pin::new(Box::new(PendingOp(fut2.boxed(), blocks_exit))));
self.have_unpolled_ops = true;
None
}
Expand Down Expand Up @@ -742,8 +744,8 @@ impl Future for Isolate {
inner.check_promise_errors();
inner.check_last_exception()?;

// We're idle if pending_ops is empty.
if inner.pending_ops.is_empty() {
// We're idle if all pending_ops have blocks_exit flag false.
if inner.pending_ops.iter().all(|op| !op.1) {
Poll::Ready(Ok(()))
} else {
if inner.have_unpolled_ops {
Expand Down Expand Up @@ -814,6 +816,7 @@ pub mod tests {

pub enum Mode {
Async,
AsyncOptional,
OverflowReqSync,
OverflowResSync,
OverflowReqAsync,
Expand All @@ -834,7 +837,18 @@ pub mod tests {
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
Op::Async(futures::future::ok(buf).boxed())
Op::Async(futures::future::ok(buf).boxed(), true)
}
Mode::AsyncOptional => {
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let fut = async {
// This future never finish.
futures::future::pending::<()>().await;
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
Ok(buf)
};
Op::Async(fut.boxed(), false)
}
Mode::OverflowReqSync => {
assert_eq!(control.len(), 100 * 1024 * 1024);
Expand All @@ -853,7 +867,7 @@ pub mod tests {
Mode::OverflowReqAsync => {
assert_eq!(control.len(), 100 * 1024 * 1024);
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
Op::Async(futures::future::ok(buf).boxed())
Op::Async(futures::future::ok(buf).boxed(), true)
}
Mode::OverflowResAsync => {
assert_eq!(control.len(), 1);
Expand All @@ -862,7 +876,7 @@ pub mod tests {
vec.resize(100 * 1024 * 1024, 0);
vec[0] = 4;
let buf = vec.into_boxed_slice();
Op::Async(futures::future::ok(buf).boxed())
Op::Async(futures::future::ok(buf).boxed(), true)
}
}
};
Expand Down Expand Up @@ -953,6 +967,31 @@ pub mod tests {
});
}

#[test]
fn test_poll_async_optional_ops() {
run_in_task(|cx| {
let (mut isolate, dispatch_count) = setup(Mode::AsyncOptional);
js_check(isolate.execute(
"check1.js",
r#"
Deno.core.setAsyncHandler(1, (buf) => {
// This handler will never be called
assert(false);
});
let control = new Uint8Array([42]);
Deno.core.send(1, control);
"#,
));
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
// The above op never finish, but isolate can finish
// because the op is an optional async op (blocks_exit flag == false).
assert!(match isolate.poll_unpin(cx) {
Poll::Ready(Ok(_)) => true,
_ => false,
});
})
}

#[test]
fn terminate_execution() {
let (tx, rx) = std::sync::mpsc::channel::<bool>();
Expand Down
17 changes: 15 additions & 2 deletions core/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::RwLock;
use std::task::{Context, Poll};

pub type OpId = u32;

Expand All @@ -13,14 +14,26 @@ pub type Buf = Box<[u8]>;
pub type OpAsyncFuture<E> =
Pin<Box<dyn Future<Output = Result<Buf, E>> + Send>>;

pub(crate) type PendingOpFuture =
pub(crate) type PendingOpInnerFuture =
Pin<Box<dyn Future<Output = Result<(OpId, Buf), CoreError>> + Send>>;

pub(crate) struct PendingOp(pub PendingOpInnerFuture, pub bool);

impl Future for PendingOp {
type Output = Result<(OpId, Buf), CoreError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.as_mut().0.as_mut().poll(cx)
}
}

pub(crate) type PendingOpFuture = Pin<Box<PendingOp>>;

pub type OpResult<E> = Result<Op<E>, E>;

pub enum Op<E> {
Sync(Buf),
Async(OpAsyncFuture<E>),
/** The 2nd element is true when the op blocks exiting, false otherwise. */
Async(OpAsyncFuture<E>, bool),
}

pub type CoreError = ();
Expand Down
2 changes: 1 addition & 1 deletion test_plugin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,5 @@ pub fn op_test_async(data: &[u8], zero_copy: Option<PinnedBuf>) -> CoreOp {
Ok(result_box)
};

Op::Async(fut.boxed())
Op::Async(fut.boxed(), true)
}

0 comments on commit 1d2fb20

Please sign in to comment.