Skip to content

Commit

Permalink
runtime: Don't disable timeout due to ipfs.map
Browse files Browse the repository at this point in the history
  • Loading branch information
leoyvens committed Jun 8, 2020
1 parent e5fd316 commit b2500c4
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 18 deletions.
38 changes: 20 additions & 18 deletions runtime/wasm/src/module/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ use std::collections::HashMap;
use std::convert::TryFrom;
use std::ops::Deref;
use std::rc::Rc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Instant;

use semver::Version;
Expand All @@ -26,7 +24,10 @@ use crate::mapping::ValidModule;
use crate::UnresolvedContractCall;

mod into_wasm_ret;
mod stopwatch;

use into_wasm_ret::IntoWasmRet;
use stopwatch::TimeoutStopwatch;

#[cfg(test)]
mod test;
Expand Down Expand Up @@ -219,8 +220,8 @@ pub(crate) struct WasmInstance {
pub(crate) host_metrics: Arc<HostMetrics>,
pub(crate) timeout: Option<Duration>,

// Used by ipfs.map to turn off interrupts.
should_interrupt: Arc<AtomicBool>,
// Used by ipfs.map.
pub(crate) timeout_stopwatch: Arc<std::sync::Mutex<TimeoutStopwatch>>,

// First free byte in the current arena.
arena_start_ptr: i32,
Expand Down Expand Up @@ -420,15 +421,19 @@ impl WasmInstance {
.context("`memory.allocate` function not found")?
.get1()?;

let should_interrupt = Arc::new(AtomicBool::new(true));
let timeout_stopwatch = Arc::new(std::sync::Mutex::new(TimeoutStopwatch::start_new()));
if let Some(timeout) = timeout {
// This task is likely to outlive the instance, which is fine.
let interrupt_handle = instance.store().interrupt_handle().unwrap();
let should_interrupt = should_interrupt.clone();
graph::spawn(async move {
tokio::time::delay_for(timeout).await;
if should_interrupt.load(Ordering::SeqCst) {
interrupt_handle.interrupt()
let timeout_stopwatch = timeout_stopwatch.clone();
graph::spawn_allow_panic(async move {
loop {
let time_left =
timeout.checked_sub(timeout_stopwatch.lock().unwrap().elapsed());
match time_left {
None => break interrupt_handle.interrupt(), // Timed out.
Some(time) => tokio::time::delay_for(time).await,
}
}
});
}
Expand All @@ -441,7 +446,7 @@ impl WasmInstance {
valid_module,
host_metrics,
timeout,
should_interrupt,
timeout_stopwatch,

// `arena_start_ptr` will be set on the first call to `raw_new`.
arena_free_size: 0,
Expand Down Expand Up @@ -717,9 +722,10 @@ impl WasmInstance {
let callback: String = self.asc_get(callback);
let user_data: store::Value = self.try_asc_get(user_data)?;

// `ipfs_map` can take a long time to process, so disable the timeout.
self.should_interrupt.store(false, Ordering::SeqCst);
let flags = self.asc_get(flags);

// Pause the timeout while running ipfs_map
self.timeout_stopwatch.lock().unwrap().stop();
let start_time = Instant::now();
let output_states = HostExports::ipfs_map(
&self.ctx.host_exports.link_resolver.clone(),
Expand Down Expand Up @@ -750,12 +756,8 @@ impl WasmInstance {
.extend(output_state.created_data_sources);
}

// Advance this module's start time by the time it took to run the entire
// ipfs_map. This has the effect of not charging this module for the time
// spent running the callback on every JSON object in the IPFS file
self.timeout_stopwatch.lock().unwrap().start();

// TODO
// self.start_time += start_time.elapsed();
Ok(())
}

Expand Down
66 changes: 66 additions & 0 deletions runtime/wasm/src/module/stopwatch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copied from https://github.com/ellisonch/rust-stopwatch
// Copyright (c) 2014 Chucky Ellison <cme at freefour.com> under MIT license

use std::default::Default;
use std::time::{Duration, Instant};

#[derive(Clone, Copy)]
pub struct TimeoutStopwatch {
/// The time the stopwatch was started last, if ever.
start_time: Option<Instant>,
/// The time the stopwatch was split last, if ever.
split_time: Option<Instant>,
/// The time elapsed while the stopwatch was running (between start() and stop()).
elapsed: Duration,
}

impl Default for TimeoutStopwatch {
fn default() -> TimeoutStopwatch {
TimeoutStopwatch {
start_time: None,
split_time: None,
elapsed: Duration::from_secs(0),
}
}
}

impl TimeoutStopwatch {
/// Returns a new stopwatch.
pub fn new() -> TimeoutStopwatch {
let sw: TimeoutStopwatch = Default::default();
return sw;
}

/// Returns a new stopwatch which will immediately be started.
pub fn start_new() -> TimeoutStopwatch {
let mut sw = TimeoutStopwatch::new();
sw.start();
return sw;
}

/// Starts the stopwatch.
pub fn start(&mut self) {
self.start_time = Some(Instant::now());
}

/// Stops the stopwatch.
pub fn stop(&mut self) {
self.elapsed = self.elapsed();
self.start_time = None;
self.split_time = None;
}

/// Returns the elapsed time since the start of the stopwatch.
pub fn elapsed(&self) -> Duration {
match self.start_time {
// stopwatch is running
Some(t1) => {
return t1.elapsed() + self.elapsed;
}
// stopwatch is not running
None => {
return self.elapsed;
}
}
}
}

0 comments on commit b2500c4

Please sign in to comment.