Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make js cancellable #1901

Merged
merged 1 commit into from
Nov 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -749,8 +749,8 @@ export const action = {
async slow(effects, _input) {
while(true) {
effects.error("A");
// await ackermann(3,10);
await effects.sleep(100);
await ackermann(3,10);
// await effects.sleep(100);

}
},
Expand Down
101 changes: 23 additions & 78 deletions libs/helpers/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use std::future::Future;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::time::Duration;

use color_eyre::eyre::{eyre, Context, Error};
use futures::future::{pending, BoxFuture};
use futures::future::BoxFuture;
use futures::FutureExt;
use tokio::fs::File;
use tokio::sync::oneshot;
use tokio::task::{JoinError, JoinHandle};
use tokio::task::{JoinError, JoinHandle, LocalSet};

mod script_dir;
pub use script_dir::*;
Expand Down Expand Up @@ -210,79 +209,25 @@ impl<T: 'static + Send> TimedResource<T> {
}
}

type SingThreadTask<T> = futures::future::Select<
futures::future::Then<
oneshot::Receiver<T>,
futures::future::Either<futures::future::Ready<T>, futures::future::Pending<T>>,
fn(
Result<T, oneshot::error::RecvError>,
)
-> futures::future::Either<futures::future::Ready<T>, futures::future::Pending<T>>,
>,
futures::future::Then<
JoinHandle<()>,
futures::future::Pending<T>,
fn(Result<(), JoinError>) -> futures::future::Pending<T>,
>,
>;

#[pin_project::pin_project(PinnedDrop)]
pub struct SingleThreadJoinHandle<T> {
abort: Option<oneshot::Sender<()>>,
#[pin]
task: SingThreadTask<T>,
}
impl<T: Send + 'static> SingleThreadJoinHandle<T> {
pub fn new<Fut: Future<Output = T>>(fut: impl FnOnce() -> Fut + Send + 'static) -> Self {
let (abort, abort_recv) = oneshot::channel();
let (return_val_send, return_val) = oneshot::channel();
fn unwrap_recv_or_pending<T>(
res: Result<T, oneshot::error::RecvError>,
) -> futures::future::Either<futures::future::Ready<T>, futures::future::Pending<T>>
{
match res {
Ok(a) => futures::future::Either::Left(futures::future::ready(a)),
_ => futures::future::Either::Right(pending()),
}
}
fn make_pending<T>(_: Result<(), JoinError>) -> futures::future::Pending<T> {
pending()
}
Self {
abort: Some(abort),
task: futures::future::select(
return_val.then(unwrap_recv_or_pending),
tokio::task::spawn_blocking(move || {
tokio::runtime::Handle::current().block_on(async move {
tokio::select! {
_ = abort_recv.fuse() => (),
res = fut().fuse() => {let _error = return_val_send.send(res);},
}
})
})
.then(make_pending),
),
}
}
}

impl<T: Send> Future for SingleThreadJoinHandle<T> {
type Output = T;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.project();
this.task.poll(cx).map(|t| t.factor_first().0)
}
}

#[pin_project::pinned_drop]
impl<T> PinnedDrop for SingleThreadJoinHandle<T> {
fn drop(self: Pin<&mut Self>) {
let this = self.project();
if let Some(abort) = this.abort.take() {
let _error = abort.send(());
}
}
pub async fn spawn_local<
T: 'static + Send,
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = T> + 'static,
>(
fut: F,
) -> NonDetachingJoinHandle<T> {
let (send, recv) = tokio::sync::oneshot::channel();
std::thread::spawn(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async move {
let set = LocalSet::new();
send.send(set.spawn_local(fut()).into())
.unwrap_or_else(|_| unreachable!());
set.await
})
});
recv.await.unwrap()
}
7 changes: 3 additions & 4 deletions libs/js_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use deno_core::{
resolve_import, Extension, JsRuntime, ModuleLoader, ModuleSource, ModuleSourceFuture,
ModuleSpecifier, ModuleType, OpDecl, RuntimeOptions, Snapshot,
};
use helpers::{script_dir, SingleThreadJoinHandle};
use helpers::{script_dir, spawn_local};
use models::{ExecCommand, PackageId, ProcedureName, TermCommand, Version, VolumeId};
use serde::{Deserialize, Serialize};
use serde_json::Value;
Expand Down Expand Up @@ -255,9 +255,8 @@ impl JsExecutionEnvironment {
));
}
};
let safer_handle =
SingleThreadJoinHandle::new(move || self.execute(procedure_name, input, variable_args));
let output = safer_handle.await?;
let safer_handle = spawn_local(|| self.execute(procedure_name, input, variable_args)).await;
let output = safer_handle.await.unwrap()?;
match serde_json::from_value(output.clone()) {
Ok(x) => Ok(x),
Err(err) => {
Expand Down