Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch the runtime from wasmi to wasmtime + cranelift #1700

Merged
merged 22 commits into from
Jun 10, 2020
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
fc4a420
runtime: Switch to wasmtime
leoyvens Jun 2, 2020
cba9444
graph: unconditionally use `futures::block_on`in `block_on_allow_panic`
leoyvens Jun 2, 2020
54fa314
*: Use more `anyhow::Error`
leoyvens Jun 2, 2020
93dc84e
runtime: Depend on wasmtime master
leoyvens Jun 2, 2020
eff818b
graph: Add `CompatErr`
leoyvens Jun 3, 2020
cddb1e4
runtime, graphql: Address review
leoyvens Jun 3, 2020
ecdfb69
runtime: Use released wasmtime version
leoyvens Jun 3, 2020
058ea6c
runtime: Better `i32` to/from `u32` conversion
leoyvens Jun 3, 2020
a42b393
runtime: Use weak references to the instance in host exports
leoyvens Jun 3, 2020
8e2caf5
runtime: Use `anyhow::ensure`
leoyvens Jun 4, 2020
21f9c26
runtime: Properly explain unsafe usage
leoyvens Jun 4, 2020
740f03d
runtime: Expand comments
leoyvens Jun 5, 2020
f815c8b
runtime: Rename `instance` and `borrow_instance`
leoyvens Jun 5, 2020
d98c0cc
runtime: depend on wasmtime master, use u32 where possible
leoyvens Jun 5, 2020
c090839
runtime: Don't disable timeout due to ipfs.map
leoyvens Jun 8, 2020
7d57905
runtime: Address review
leoyvens Jun 10, 2020
f8f3116
runtime: Fix flaky tests
leoyvens Jun 10, 2020
33ebb5c
runtime: Spawn test modules that use block_on in their own thread
leoyvens Jun 10, 2020
fd616a0
graph: Use tokio block_on instead of futures block_on
leoyvens Jun 10, 2020
1d08115
runtime: Remove redundant `block_in_place` in test
leoyvens Jun 10, 2020
36da2e1
graph: Fix typo
leoyvens Jun 10, 2020
65a2183
runtime: Uncomment test
leoyvens Jun 10, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
630 changes: 578 additions & 52 deletions Cargo.lock

Large diffs are not rendered by default.

38 changes: 20 additions & 18 deletions core/src/subgraph/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,15 @@ where
data_source: DataSource,
top_level_templates: Arc<Vec<DataSourceTemplate>>,
host_metrics: Arc<HostMetrics>,
) -> Result<T::Host, Error> {
) -> Result<T::Host, anyhow::Error> {
let mapping_request_sender = {
let module_bytes = data_source.mapping.runtime.as_ref();
let module_hash = tiny_keccak::keccak256(module_bytes);
if let Some(sender) = self.module_cache.get(&module_hash) {
sender.clone()
} else {
let sender = T::spawn_mapping(
module_bytes,
module_bytes.clone(),
logger,
self.subgraph_id.clone(),
host_metrics.clone(),
Expand All @@ -109,14 +109,16 @@ where
sender
}
};
self.host_builder.build(
self.network.clone(),
self.subgraph_id.clone(),
data_source,
top_level_templates,
mapping_request_sender,
host_metrics,
)
self.host_builder
.build(
self.network.clone(),
self.subgraph_id.clone(),
data_source,
top_level_templates,
mapping_request_sender,
host_metrics,
)
.compat_err()
}
}

Expand All @@ -137,7 +139,7 @@ where
trigger: EthereumTrigger,
state: BlockState,
proof_of_indexing: SharedProofOfIndexing,
) -> Result<BlockState, Error> {
) -> Result<BlockState, anyhow::Error> {
Self::process_trigger_in_runtime_hosts(
logger,
&self.hosts,
Expand All @@ -156,15 +158,15 @@ where
trigger: EthereumTrigger,
mut state: BlockState,
proof_of_indexing: SharedProofOfIndexing,
) -> Result<BlockState, Error> {
) -> Result<BlockState, anyhow::Error> {
match trigger {
EthereumTrigger::Log(log) => {
let log = Arc::new(log);

let transaction = block
.transaction_for_log(&log)
.map(Arc::new)
.ok_or_else(|| format_err!("Found no transaction for event"))?;
.context("Found no transaction for event")?;
That3Percent marked this conversation as resolved.
Show resolved Hide resolved
let matching_hosts = hosts.iter().filter(|host| host.matches_log(&log));
// Process the log in each host in the same order the corresponding data
// sources appear in the subgraph manifest
Expand All @@ -187,7 +189,7 @@ where

let transaction = block
.transaction_for_call(&call)
.ok_or_else(|| format_err!("Found no transaction for call"))?;
.context("Found no transaction for call")?;
let transaction = Arc::new(transaction);
let matching_hosts = hosts.iter().filter(|host| host.matches_call(&call));

Expand Down Expand Up @@ -230,14 +232,14 @@ where
data_source: DataSource,
top_level_templates: Arc<Vec<DataSourceTemplate>>,
metrics: Arc<HostMetrics>,
) -> Result<Arc<T::Host>, Error> {
) -> Result<Arc<T::Host>, anyhow::Error> {
// Protect against creating more than the allowed maximum number of data sources
if let Some(max_data_sources) = *MAX_DATA_SOURCES {
if self.hosts.len() >= max_data_sources {
return Err(format_err!(
anyhow::bail!(
That3Percent marked this conversation as resolved.
Show resolved Hide resolved
"Limit of {} data sources per subgraph exceeded",
max_data_sources
));
max_data_sources,
);
}
}

Expand Down
13 changes: 7 additions & 6 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,8 @@ where
&mut ctx,
host_metrics.clone(),
block_state.created_data_sources.drain(..),
)?;
)
.compat_err()?;

// Reprocess the triggers from this block that match the new data sources
let block_with_triggers = triggers_in_block(
Expand Down Expand Up @@ -734,7 +735,6 @@ where

// Process the triggers in each host in the same order the
// corresponding data sources have been created.

for trigger in triggers.into_iter() {
block_state = SubgraphInstance::<T>::process_trigger_in_runtime_hosts(
&logger,
Expand All @@ -744,7 +744,8 @@ where
block_state,
proof_of_indexing.cheap_clone(),
)
.await?;
.await
.compat_err()?;
}
}

Expand Down Expand Up @@ -908,12 +909,12 @@ async fn process_triggers<B: BlockStreamBuilder, T: RuntimeHostBuilder, S: Send
.await
.map_err(move |e| match transaction_id {
Some(tx_hash) => format_err!(
"Failed to process trigger in block {}, transaction {:x}: {}",
"Failed to process trigger in block {}, transaction {:x}: {:#}",
block_ptr,
tx_hash,
e
),
None => format_err!("Failed to process trigger: {}", e),
None => format_err!("Failed to process trigger: {:#}", e),
})?;
let elapsed = start.elapsed().as_secs_f64();
subgraph_metrics.observe_trigger_processing_duration(elapsed, trigger_type);
Expand All @@ -926,7 +927,7 @@ fn create_dynamic_data_sources<B, T: RuntimeHostBuilder, S>(
ctx: &mut IndexingContext<B, T, S>,
host_metrics: Arc<HostMetrics>,
created_data_sources: impl Iterator<Item = DataSourceTemplateInfo>,
) -> Result<(Vec<DataSource>, Vec<Arc<T::Host>>), Error>
) -> Result<(Vec<DataSource>, Vec<Arc<T::Host>>), anyhow::Error>
where
B: BlockStreamBuilder,
S: ChainStore + Store + SubgraphDeploymentStore + EthereumCallCache,
Expand Down
1 change: 1 addition & 0 deletions graph/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ version = "0.18.0"
edition = "2018"

[dependencies]
anyhow = "1.0"
async-trait = "0.1.31"
atomic_refcell = "0.1.6"
bigdecimal = { version = "0.1.0", features = ["serde"] }
Expand Down
10 changes: 5 additions & 5 deletions graph/src/components/subgraph/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub trait RuntimeHost: Send + Sync + Debug + 'static {
log: &Arc<Log>,
state: BlockState,
proof_of_indexing: SharedProofOfIndexing,
) -> Result<BlockState, Error>;
) -> Result<BlockState, anyhow::Error>;

/// Process an Ethereum call and return a vector of entity operations
async fn process_call(
Expand All @@ -43,7 +43,7 @@ pub trait RuntimeHost: Send + Sync + Debug + 'static {
call: &Arc<EthereumCall>,
state: BlockState,
proof_of_indexing: SharedProofOfIndexing,
) -> Result<BlockState, Error>;
) -> Result<BlockState, anyhow::Error>;

/// Process an Ethereum block and return a vector of entity operations
async fn process_block(
Expand All @@ -53,7 +53,7 @@ pub trait RuntimeHost: Send + Sync + Debug + 'static {
trigger_type: &EthereumBlockTriggerType,
state: BlockState,
proof_of_indexing: SharedProofOfIndexing,
) -> Result<BlockState, Error>;
) -> Result<BlockState, anyhow::Error>;
}

pub struct HostMetrics {
Expand Down Expand Up @@ -131,9 +131,9 @@ pub trait RuntimeHostBuilder: Clone + Send + Sync + 'static {
/// Spawn a mapping and return a channel for mapping requests. The sender should be able to be
/// cached and shared among mappings that use the same wasm file.
fn spawn_mapping(
raw_module: &[u8],
raw_module: Vec<u8>,
logger: Logger,
subgraph_id: SubgraphDeploymentId,
metrics: Arc<HostMetrics>,
) -> Result<mpsc::Sender<Self::Req>, Error>;
) -> Result<mpsc::Sender<Self::Req>, anyhow::Error>;
}
6 changes: 3 additions & 3 deletions graph/src/components/subgraph/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub trait SubgraphInstance<H: RuntimeHost> {
trigger: EthereumTrigger,
state: BlockState,
proof_of_indexing: SharedProofOfIndexing,
) -> Result<BlockState, Error>;
) -> Result<BlockState, anyhow::Error>;

/// Like `process_trigger` but processes an Ethereum event in a given list of hosts.
async fn process_trigger_in_runtime_hosts(
Expand All @@ -52,7 +52,7 @@ pub trait SubgraphInstance<H: RuntimeHost> {
trigger: EthereumTrigger,
state: BlockState,
proof_of_indexing: SharedProofOfIndexing,
) -> Result<BlockState, Error>;
) -> Result<BlockState, anyhow::Error>;

/// Adds dynamic data sources to the subgraph.
fn add_dynamic_data_source(
Expand All @@ -61,5 +61,5 @@ pub trait SubgraphInstance<H: RuntimeHost> {
data_source: DataSource,
top_level_templates: Arc<Vec<DataSourceTemplate>>,
metrics: Arc<HostMetrics>,
) -> Result<Arc<H>, Error>;
) -> Result<Arc<H>, anyhow::Error>;
}
22 changes: 12 additions & 10 deletions graph/src/data/subgraph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ use crate::data::subgraph::schema::{
EthereumContractEventHandlerEntity, EthereumContractMappingEntity,
EthereumContractSourceEntity, SUBGRAPHS_ID,
};
use crate::prelude::{format_err, impl_slog_value, BlockNumber, Deserialize, Fail, Serialize};
use crate::prelude::{
anyhow::{self, Context},
format_err, impl_slog_value, BlockNumber, Deserialize, Fail, Serialize,
};
use crate::util::ethereum::string_to_h256;
use graphql_parser::query as q;

Expand Down Expand Up @@ -722,9 +725,9 @@ impl UnresolvedDataSource {
}

impl TryFrom<DataSourceTemplateInfo> for DataSource {
type Error = failure::Error;
type Error = anyhow::Error;

fn try_from(info: DataSourceTemplateInfo) -> Result<Self, failure::Error> {
fn try_from(info: DataSourceTemplateInfo) -> Result<Self, anyhow::Error> {
let DataSourceTemplateInfo {
data_source: _,
template,
Expand All @@ -735,19 +738,18 @@ impl TryFrom<DataSourceTemplateInfo> for DataSource {
// Obtain the address from the parameters
let string = params
.get(0)
.ok_or_else(|| {
format_err!(
.with_context(|| {
format!(
"Failed to create data source from template `{}`: address parameter is missing",
template.name
)
})?
.trim_start_matches("0x");

let address = Address::from_str(string).map_err(|e| {
format_err!(
"Failed to create data source from template `{}`: invalid address provided: {}",
template.name,
e
let address = Address::from_str(string).with_context(|| {
format!(
"Failed to create data source from template `{}`, invalid address provided",
template.name
)
})?;

Expand Down
4 changes: 3 additions & 1 deletion graph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ pub use url;
/// ```
pub mod prelude {
pub use super::entity;
pub use anyhow::{self, Context as _};
pub use async_trait::async_trait;
pub use bigdecimal;
pub use ethabi;
pub use failure::{self, bail, err_msg, format_err, Error, Fail, SyncFailure};
pub use failure::{self, err_msg, format_err, Error, Fail, SyncFailure};
pub use futures::future;
pub use futures::prelude::*;
pub use futures::stream;
Expand Down Expand Up @@ -146,5 +147,6 @@ pub mod prelude {
ComponentLoggerConfig, ElasticComponentLoggerConfig, LoggerFactory,
};
pub use crate::log::split::split_logger;
pub use crate::util::error::CompatErr;
leoyvens marked this conversation as resolved.
Show resolved Hide resolved
pub use crate::util::futures::{retry, TimeoutError};
}
46 changes: 19 additions & 27 deletions graph/src/task_spawn.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
use futures03::executor::block_on;
//! The functions in this module should be used to execute futures, serving as a facade to the
//! underlying executor implementation which currently is tokio. This serves a few purposes:
//! - Avoid depending directly on tokio APIs, making upgrades or a potential switch easier.
//! - Reflect our chosen default semantics of aborting on task panic, offering `*_allow_panic`
//! functoins to opt out of that.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo -> functions

//! - Reflect that historically we've used blocking futures due to making DB calls directly within
//! futures. This point should go away once https://github.com/graphprotocol/graph-node/issues/905
//! is resolved. Then the blocking flavors should no longer accept futures but closures.
//!
//! These should not be called from within executors other than tokio, particularly the blocking
//! functions will panic in that case. We should generally avoid mixing executors whenever possible.

use futures03::future::{FutureExt, TryFutureExt};
use std::future::Future as Future03;
use std::panic::AssertUnwindSafe;
Expand All @@ -14,6 +25,11 @@ fn abort_on_panic<T: Send + 'static>(
})
}

/// Runs the future on the current thread. Panics if not within a tokio runtime.
fn block_on<T>(f: impl Future03<Output = T>) -> T {
tokio::runtime::Handle::current().block_on(f)
}

/// Aborts on panic.
pub fn spawn<T: Send + 'static>(f: impl Future03<Output = T> + Send + 'static) -> JoinHandle<T> {
tokio::spawn(abort_on_panic(f))
Expand Down Expand Up @@ -46,30 +62,6 @@ pub async fn spawn_blocking_async_allow_panic<R: 'static + Send>(
tokio::task::spawn_blocking(f).await.unwrap()
}

/// Panics if there is no current tokio::Runtime
pub fn block_on_allow_panic<T>(f: impl Future03<Output = T> + Send) -> T {
let rt = tokio::runtime::Handle::current();

rt.enter(move || {
// TODO: It should be possible to just call block_on directly, but we
// ran into this bug https://github.com/rust-lang/futures-rs/issues/2090
// which will panic if we're already in block_on. So, detect if this
// function would panic. Once we fix this, we can remove the requirement
// of + Send for f (and few functions that call this that had +Send
// added) The test which ran into this bug was
// runtime::wasm::test::ipfs_fail
let enter = futures03::executor::enter();
let oh_no = enter.is_err();
drop(enter);

// If the function would panic, fallback to the old ways.
if oh_no {
use futures::future::Future as _;
let compat = async { Result::<T, ()>::Ok(f.await) }.boxed().compat();
// This unwrap does not panic because of the above line always returning Ok
compat.wait().unwrap()
} else {
block_on(f)
}
})
pub fn block_on_allow_panic<T>(f: impl Future03<Output = T>) -> T {
block_on(f)
That3Percent marked this conversation as resolved.
Show resolved Hide resolved
}
31 changes: 31 additions & 0 deletions graph/src/util/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Converts back and forth between `failure::Error` and `anyhow::Error`
// while we don't migrate fully to `anyhow`.
pub trait CompatErr {
type Other;
fn compat_err(self) -> Self::Other;
}

impl CompatErr for failure::Error {
type Other = anyhow::Error;

fn compat_err(self) -> anyhow::Error {
anyhow::Error::from(self.compat())
}
}

impl CompatErr for anyhow::Error {
type Other = failure::Error;

fn compat_err(self) -> failure::Error {
// Convert as a single error containing all the causes.
failure::err_msg(format!("{:#}", self))
}
}

impl<T, E: CompatErr> CompatErr for Result<T, E> {
type Other = Result<T, E::Other>;

fn compat_err(self) -> Self::Other {
self.map_err(CompatErr::compat_err)
}
}
2 changes: 2 additions & 0 deletions graph/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ pub mod ethereum;
pub mod security;

pub mod lfu_cache;

pub mod error;
Loading