Skip to content

Commit

Permalink
feat: Add a future which can avoid creating an event loop for sync calls
Browse files Browse the repository at this point in the history
  • Loading branch information
Marwes committed Jan 28, 2017
1 parent 120f9d1 commit d2132fc
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 14 deletions.
4 changes: 2 additions & 2 deletions src/compiler_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ impl<E> Executable<()> for CompileValue<E>
let CompileValue { expr, typ, mut function } = self;
function.id = Symbol::from(name);
let closure = vm.global_env().new_global_thunk(function)?;
let value = vm.call_thunk(closure).wait()?;
let value = vm.call_thunk(closure)?.wait()?;
Ok(ExecuteValue {
expr: expr,
typ: typ,
Expand All @@ -321,7 +321,7 @@ impl<E> Executable<()> for CompileValue<E>
let CompileValue { mut expr, typ, function } = self;
let metadata = metadata::metadata(&*vm.get_env(), expr.borrow_mut());
let closure = vm.global_env().new_global_thunk(function)?;
let value = vm.call_thunk(closure).wait()?;
let value = vm.call_thunk(closure)?.wait()?;
vm.set_global(closure.function.name.clone(), typ, metadata, value)?;
info!("Loaded module `{}` filename", filename);
Ok(())
Expand Down
12 changes: 4 additions & 8 deletions vm/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -877,15 +877,11 @@ impl<'vm, F> AsyncPushable<'vm> for FutureResult<F>
where F: Future<Error = Error> + Send + 'static,
F::Item: Pushable<'vm>,
{
fn async_push(mut self, vm: &'vm Thread, context: &mut Context) -> Result<Async<()>> {
match self.0.poll() {
Ok(Async::Ready(value)) => value.push(vm, context).map(Async::Ready),
Ok(Async::NotReady) => unsafe {
context.return_future(self.0);
Ok(Async::NotReady)
},
Err(err) => Err(err),
fn async_push(self, _: &'vm Thread, context: &mut Context) -> Result<Async<()>> {
unsafe {
context.return_future(self.0);
}
Ok(Async::NotReady)
}
}

Expand Down
59 changes: 59 additions & 0 deletions vm/src/future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use futures::{Async, Future, Poll};

use Error;

pub enum FutureValue<F>
where F: Future,
{
Value(F::Item),
Future(F),
Polled,
}

impl<F> FutureValue<F>
where F: Future<Error = Error>,
{
pub fn sync_or_error(self) -> Result<F::Item, Error> {
match self {
FutureValue::Value(v) => Ok(v),
FutureValue::Future(_) => {
Err(Error::Message("Future needs to be resolved asynchronously".into()))
}
FutureValue::Polled => {
panic!("`FutureValue` may not be polled again if it contained a value")
}
}
}
}

impl<F> Future for FutureValue<F>
where F: Future,
{
type Item = F::Item;
type Error = F::Error;

fn poll(&mut self) -> Poll<F::Item, F::Error> {
match *self {
FutureValue::Value(_) => {
match ::std::mem::replace(self, FutureValue::Polled) {
FutureValue::Value(v) => return Ok(Async::Ready(v)),
_ => unreachable!(),
}
}
FutureValue::Future(ref mut f) => f.poll(),
FutureValue::Polled => {
panic!("`FutureValue` may not be polled again if it contained a value")
}
}
}

fn wait(self) -> Result<F::Item, F::Error> {
match self {
FutureValue::Value(v) => Ok(v),
FutureValue::Future(f) => f.wait(),
FutureValue::Polled => {
panic!("`FutureValue` may not be polled again if it contained a value")
}
}
}
}
1 change: 1 addition & 0 deletions vm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub mod api;
pub mod channel;
pub mod compiler;
pub mod debug;
pub mod future;
pub mod gc;
pub mod macros;
pub mod thread;
Expand Down
16 changes: 12 additions & 4 deletions vm/src/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::sync::Arc;
use std::usize;

use futures::{Async, Future, Poll};
use future::FutureValue;

use base::metadata::Metadata;
use base::pos::Line;
Expand Down Expand Up @@ -475,7 +476,9 @@ impl Thread {

/// Internal functions for interacting with threads. These functions should be considered both
/// unsafe and unstable.
pub trait ThreadInternal {
pub trait ThreadInternal
where for<'a> &'a Self: Deref<Target = Thread>,
{
/// Locks and retrives this threads stack
fn context(&self) -> OwnedContext;

Expand All @@ -499,7 +502,7 @@ pub trait ThreadInternal {
-> Result<()>;

/// Evaluates a zero argument function (a thunk)
fn call_thunk(&self, closure: GcPtr<ClosureData>) -> Execute<&Self>;
fn call_thunk(&self, closure: GcPtr<ClosureData>) -> Result<FutureValue<Execute<&Self>>>;

/// Executes an `IO` action
fn execute_io(&self, value: Value) -> Result<Async<Value>>;
Expand Down Expand Up @@ -589,11 +592,16 @@ impl ThreadInternal for Thread {
Ok(())
}

fn call_thunk(&self, closure: GcPtr<ClosureData>) -> Execute<&Thread> {
fn call_thunk(&self, closure: GcPtr<ClosureData>) -> Result<FutureValue<Execute<&Thread>>> {
let mut context = self.current_context();
context.stack.push(Closure(closure));
context.borrow_mut().enter_scope(0, State::Closure(closure));
Execute { thread: self }
context.execute().map(|async| {
match async {
Async::Ready(context) => FutureValue::Value(context.unwrap().stack.pop()),
Async::NotReady => FutureValue::Future(Execute { thread: self }),
}
})
}

/// Calls a module, allowed to to run IO expressions
Expand Down

0 comments on commit d2132fc

Please sign in to comment.