diff --git a/Cargo.lock b/Cargo.lock index 15408ba75af6c..3bfcf637cb60b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4528,6 +4528,20 @@ dependencies = [ "sp-std", ] +[[package]] +name = "pallet-example-parallel" +version = "2.0.0" +dependencies = [ + "frame-support", + "frame-system", + "parity-scale-codec", + "sp-core", + "sp-io", + "sp-runtime", + "sp-std", + "sp-tasks", +] + [[package]] name = "pallet-grandpa" version = "2.0.0" @@ -6711,6 +6725,7 @@ dependencies = [ "sp-runtime-interface", "sp-serializer", "sp-state-machine", + "sp-tasks", "sp-tracing", "sp-trie", "sp-version", @@ -7150,6 +7165,7 @@ dependencies = [ "sp-runtime", "sp-sandbox", "sp-std", + "sp-tasks", "substrate-wasm-builder-runner", ] @@ -8436,6 +8452,19 @@ dependencies = [ "sp-std", ] +[[package]] +name = "sp-tasks" +version = "2.0.0" +dependencies = [ + "log", + "parity-scale-codec", + "sp-core", + "sp-externalities", + "sp-io", + "sp-runtime-interface", + "sp-std", +] + [[package]] name = "sp-test-primitives" version = "2.0.0" diff --git a/Cargo.toml b/Cargo.toml index dc78ad41571fd..ab2d6e27824da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,6 +77,7 @@ members = [ "frame/evm", "frame/example", "frame/example-offchain-worker", + "frame/example-parallel", "frame/executive", "frame/grandpa", "frame/identity", @@ -162,6 +163,7 @@ members = [ "primitives/std", "primitives/version", "primitives/state-machine", + "primitives/tasks", "primitives/timestamp", "primitives/test-primitives", "primitives/transaction-pool", diff --git a/client/executor/Cargo.toml b/client/executor/Cargo.toml index 3220ae7161b7a..b88e8926be141 100644 --- a/client/executor/Cargo.toml +++ b/client/executor/Cargo.toml @@ -18,6 +18,7 @@ derive_more = "0.99.2" codec = { package = "parity-scale-codec", version = "1.3.4" } sp-io = { version = "2.0.0", path = "../../primitives/io" } sp-core = { version = "2.0.0", path = "../../primitives/core" } +sp-tasks = { version = "2.0.0", path = "../../primitives/tasks" } sp-trie = { version = "2.0.0", path = "../../primitives/trie" } sp-serializer = { version = "2.0.0", path = "../../primitives/serializer" } sp-version = { version = "2.0.0", path = "../../primitives/version" } diff --git a/client/executor/common/src/error.rs b/client/executor/common/src/error.rs index 04850e6f8dd7e..caed63c183e68 100644 --- a/client/executor/common/src/error.rs +++ b/client/executor/common/src/error.rs @@ -81,6 +81,25 @@ pub enum Error { /// Execution of a host function failed. #[display(fmt="Host function {} execution failed with: {}", _0, _1)] FunctionExecution(String, String), + /// No table is present. + /// + /// Call was requested that requires table but none was present in the instance. + #[display(fmt="No table exported by wasm blob")] + NoTable, + /// No table entry is present. + /// + /// Call was requested that requires specific entry in the table to be present. + #[display(fmt="No table entry with index {} in wasm blob exported table", _0)] + #[from(ignore)] + NoTableEntryWithIndex(u32), + /// Table entry is not a function. + #[display(fmt="Table element with index {} is not a function in wasm blob exported table", _0)] + #[from(ignore)] + TableElementIsNotAFunction(u32), + /// Function in table is null and thus cannot be called. + #[display(fmt="Table entry with index {} in wasm blob is null", _0)] + #[from(ignore)] + FunctionRefIsNull(u32), } impl std::error::Error for Error { diff --git a/client/executor/common/src/wasm_runtime.rs b/client/executor/common/src/wasm_runtime.rs index b59ca8ba930ed..c407d9967cbf9 100644 --- a/client/executor/common/src/wasm_runtime.rs +++ b/client/executor/common/src/wasm_runtime.rs @@ -19,6 +19,46 @@ use crate::error::Error; use sp_wasm_interface::Value; +/// A method to be used to find the entrypoint when calling into the runtime +/// +/// Contains variants on how to resolve wasm function that will be invoked. +pub enum InvokeMethod<'a> { + /// Call function exported with this name. + /// + /// Located function should have (u32, u32) -> u64 signature. + Export(&'a str), + /// Call a function found in the exported table found under the given index. + /// + /// Located function should have (u32, u32) -> u64 signature. + Table(u32), + /// Call function by reference from table through a wrapper. + /// + /// Invoked function (`dispatcher_ref`) function + /// should have (u32, u32, u32) -> u64 signature. + /// + /// `func` will be passed to the invoked function as a first argument. + TableWithWrapper { + /// Wrapper for the call. + /// + /// Function pointer, index into runtime exported table. + dispatcher_ref: u32, + /// Extra argument for dispatch. + /// + /// Common usage would be to use it as an actual wasm function pointer + /// that should be invoked, but can be used as any extra argument on the + /// callee side. + /// + /// This is typically generated and invoked by the runtime itself. + func: u32, + }, +} + +impl<'a> From<&'a str> for InvokeMethod<'a> { + fn from(val: &'a str) -> InvokeMethod<'a> { + InvokeMethod::Export(val) + } +} + /// A trait that defines an abstract WASM runtime module. /// /// This can be implemented by an execution engine. @@ -31,11 +71,24 @@ pub trait WasmModule: Sync + Send { /// /// This can be implemented by an execution engine. pub trait WasmInstance: Send { - /// Call a method on this WASM instance and reset it afterwards. + /// Call a method on this WASM instance. + /// + /// Before execution, instance is reset. + /// + /// Returns the encoded result on success. + fn call(&self, method: InvokeMethod, data: &[u8]) -> Result, Error>; + + /// Call an exported method on this WASM instance. + /// + /// Before execution, instance is reset. + /// /// Returns the encoded result on success. - fn call(&self, method: &str, data: &[u8]) -> Result, Error>; + fn call_export(&self, method: &str, data: &[u8]) -> Result, Error> { + self.call(method.into(), data) + } /// Get the value from a global with the given `name`. + /// /// This method is only suitable for getting immutable globals. fn get_global_const(&self, name: &str) -> Result, Error>; } diff --git a/client/executor/runtime-test/Cargo.toml b/client/executor/runtime-test/Cargo.toml index 444b69f84496c..ba23e31febee5 100644 --- a/client/executor/runtime-test/Cargo.toml +++ b/client/executor/runtime-test/Cargo.toml @@ -13,12 +13,13 @@ repository = "https://github.com/paritytech/substrate/" targets = ["x86_64-unknown-linux-gnu"] [dependencies] -sp-std = { version = "2.0.0", default-features = false, path = "../../../primitives/std" } -sp-io = { version = "2.0.0", default-features = false, path = "../../../primitives/io" } -sp-sandbox = { version = "0.8.0", default-features = false, path = "../../../primitives/sandbox" } +sp-allocator = { version = "2.0.0", default-features = false, path = "../../../primitives/allocator" } sp-core = { version = "2.0.0", default-features = false, path = "../../../primitives/core" } +sp-io = { version = "2.0.0", default-features = false, path = "../../../primitives/io" } sp-runtime = { version = "2.0.0", default-features = false, path = "../../../primitives/runtime" } -sp-allocator = { version = "2.0.0", default-features = false, path = "../../../primitives/allocator" } +sp-sandbox = { version = "0.8.0", default-features = false, path = "../../../primitives/sandbox" } +sp-std = { version = "2.0.0", default-features = false, path = "../../../primitives/std" } +sp-tasks = { version = "2.0.0", default-features = false, path = "../../../primitives/tasks" } [build-dependencies] wasm-builder-runner = { version = "2.0.0", package = "substrate-wasm-builder-runner", path = "../../../utils/wasm-builder-runner" } @@ -26,10 +27,11 @@ wasm-builder-runner = { version = "2.0.0", package = "substrate-wasm-builder-run [features] default = [ "std" ] std = [ + "sp-allocator/std", + "sp-core/std", "sp-io/std", + "sp-runtime/std", "sp-sandbox/std", "sp-std/std", - "sp-core/std", - "sp-runtime/std", - "sp-allocator/std", + "sp-tasks/std", ] diff --git a/client/executor/runtime-test/src/lib.rs b/client/executor/runtime-test/src/lib.rs index 04397afd776d7..404530c1c3ebf 100644 --- a/client/executor/runtime-test/src/lib.rs +++ b/client/executor/runtime-test/src/lib.rs @@ -309,6 +309,43 @@ sp_core::wasm_export_functions! { assert_ne!(test_message, message_slice); message_slice.copy_from_slice(test_message); } + + fn test_spawn() { + let data = vec![1u8, 2u8]; + let data_new = sp_tasks::spawn(tasks::incrementer, data).join(); + + assert_eq!(data_new, vec![2u8, 3u8]); + } + + fn test_nested_spawn() { + let data = vec![7u8, 13u8]; + let data_new = sp_tasks::spawn(tasks::parallel_incrementer, data).join(); + + assert_eq!(data_new, vec![10u8, 16u8]); + } + + fn test_panic_in_spawned() { + sp_tasks::spawn(tasks::panicker, vec![]).join(); + } + } + + #[cfg(not(feature = "std"))] + mod tasks { + use sp_std::prelude::*; + + pub fn incrementer(data: Vec) -> Vec { + data.into_iter().map(|v| v + 1).collect() + } + + pub fn panicker(_: Vec) -> Vec { + panic!() + } + + pub fn parallel_incrementer(data: Vec) -> Vec { + let first = data.into_iter().map(|v| v + 2).collect::>(); + let second = sp_tasks::spawn(incrementer, first).join(); + second + } } #[cfg(not(feature = "std"))] diff --git a/client/executor/src/integration_tests/mod.rs b/client/executor/src/integration_tests/mod.rs index 3ff676fdbe61f..c8b763a6b1936 100644 --- a/client/executor/src/integration_tests/mod.rs +++ b/client/executor/src/integration_tests/mod.rs @@ -555,13 +555,13 @@ fn returns_mutable_static(wasm_method: WasmExecutionMethod) { ).expect("Creates runtime"); let instance = runtime.new_instance().unwrap(); - let res = instance.call("returns_mutable_static", &[0]).unwrap(); + let res = instance.call_export("returns_mutable_static", &[0]).unwrap(); assert_eq!(33, u64::decode(&mut &res[..]).unwrap()); // We expect that every invocation will need to return the initial // value plus one. If the value increases more than that then it is // a sign that the wasm runtime preserves the memory content. - let res = instance.call("returns_mutable_static", &[0]).unwrap(); + let res = instance.call_export("returns_mutable_static", &[0]).unwrap(); assert_eq!(33, u64::decode(&mut &res[..]).unwrap()); } @@ -590,11 +590,11 @@ fn restoration_of_globals(wasm_method: WasmExecutionMethod) { let instance = runtime.new_instance().unwrap(); // On the first invocation we allocate approx. 768KB (75%) of stack and then trap. - let res = instance.call("allocates_huge_stack_array", &true.encode()); + let res = instance.call_export("allocates_huge_stack_array", &true.encode()); assert!(res.is_err()); // On the second invocation we allocate yet another 768KB (75%) of stack - let res = instance.call("allocates_huge_stack_array", &false.encode()); + let res = instance.call_export("allocates_huge_stack_array", &false.encode()); assert!(res.is_ok()); } @@ -616,10 +616,10 @@ fn heap_is_reset_between_calls(wasm_method: WasmExecutionMethod) { .expect("`__heap_base` is an `i32`"); let params = (heap_base as u32, 512u32 * 64 * 1024).encode(); - instance.call("check_and_set_in_heap", ¶ms).unwrap(); + instance.call_export("check_and_set_in_heap", ¶ms).unwrap(); // Cal it a second time to check that the heap was freed. - instance.call("check_and_set_in_heap", ¶ms).unwrap(); + instance.call_export("check_and_set_in_heap", ¶ms).unwrap(); } #[test_case(WasmExecutionMethod::Interpreted)] @@ -720,3 +720,51 @@ fn wasm_tracing_should_work(wasm_method: WasmExecutionMethod) { assert_eq!(span_datum.name, ""); assert_eq!(values.bool_values.get("wasm").unwrap(), &true); } + +#[test_case(WasmExecutionMethod::Interpreted)] +#[cfg_attr(feature = "wasmtime", test_case(WasmExecutionMethod::Compiled))] +fn spawning_runtime_instance_should_work(wasm_method: WasmExecutionMethod) { + + let mut ext = TestExternalities::default(); + let mut ext = ext.ext(); + + call_in_wasm( + "test_spawn", + &[], + wasm_method, + &mut ext, + ).unwrap(); +} + +#[test_case(WasmExecutionMethod::Interpreted)] +#[cfg_attr(feature = "wasmtime", test_case(WasmExecutionMethod::Compiled))] +fn spawning_runtime_instance_nested_should_work(wasm_method: WasmExecutionMethod) { + + let mut ext = TestExternalities::default(); + let mut ext = ext.ext(); + + call_in_wasm( + "test_nested_spawn", + &[], + wasm_method, + &mut ext, + ).unwrap(); +} + +#[test_case(WasmExecutionMethod::Interpreted)] +#[cfg_attr(feature = "wasmtime", test_case(WasmExecutionMethod::Compiled))] +fn panic_in_spawned_instance_panics_on_joining_its_result(wasm_method: WasmExecutionMethod) { + + let mut ext = TestExternalities::default(); + let mut ext = ext.ext(); + + let error_result = call_in_wasm( + "test_panic_in_spawned", + &[], + wasm_method, + &mut ext, + ).unwrap_err(); + + dbg!(&error_result); + assert!(format!("{}", error_result).contains("Spawned task")); +} diff --git a/client/executor/src/native_executor.rs b/client/executor/src/native_executor.rs index 0aeec98067f40..1da82313a2df9 100644 --- a/client/executor/src/native_executor.rs +++ b/client/executor/src/native_executor.rs @@ -20,15 +20,28 @@ use crate::{ RuntimeInfo, error::{Error, Result}, wasm_runtime::{RuntimeCache, WasmExecutionMethod}, }; + +use std::{ + collections::HashMap, + panic::{UnwindSafe, AssertUnwindSafe}, + result, + sync::{Arc, atomic::{AtomicU64, Ordering}, mpsc}, +}; + use sp_version::{NativeVersion, RuntimeVersion}; use codec::{Decode, Encode}; use sp_core::{ - NativeOrEncoded, traits::{CodeExecutor, Externalities, RuntimeCode, MissingHostFunctions}, + NativeOrEncoded, + traits::{ + CodeExecutor, Externalities, RuntimeCode, MissingHostFunctions, + RuntimeSpawnExt, RuntimeSpawn, + }, }; use log::trace; -use std::{result, panic::{UnwindSafe, AssertUnwindSafe}, sync::Arc}; use sp_wasm_interface::{HostFunctions, Function}; -use sc_executor_common::wasm_runtime::WasmInstance; +use sc_executor_common::wasm_runtime::{WasmInstance, WasmModule, InvokeMethod}; +use sp_externalities::ExternalitiesExt as _; +use sp_tasks::new_async_externalities; /// Default num of pages for the heap const DEFAULT_HEAP_PAGES: u64 = 1024; @@ -136,6 +149,7 @@ impl WasmExecutor { f: F, ) -> Result where F: FnOnce( + AssertUnwindSafe<&Arc>, AssertUnwindSafe<&dyn WasmInstance>, Option<&RuntimeVersion>, AssertUnwindSafe<&mut dyn Externalities>, @@ -148,10 +162,11 @@ impl WasmExecutor { self.default_heap_pages, &*self.host_functions, allow_missing_host_functions, - |instance, version, ext| { + |module, instance, version, ext| { + let module = AssertUnwindSafe(module); let instance = AssertUnwindSafe(instance); let ext = AssertUnwindSafe(ext); - f(instance, version, ext) + f(module, instance, version, ext) } )? { Ok(r) => r, @@ -179,10 +194,13 @@ impl sp_core::traits::CallInWasm for WasmExecutor { heap_pages: None, }; - self.with_instance(&code, ext, allow_missing_host_functions, |instance, _, mut ext| { + self.with_instance(&code, ext, allow_missing_host_functions, |module, instance, _, mut ext| { with_externalities_safe( &mut **ext, - move || instance.call(method, call_data), + move || { + RuntimeInstanceSpawn::register_on_externalities(module.clone()); + instance.call_export(method, call_data) + } ) }).map_err(|e| e.to_string()) } else { @@ -200,10 +218,14 @@ impl sp_core::traits::CallInWasm for WasmExecutor { let instance = AssertUnwindSafe(instance); let mut ext = AssertUnwindSafe(ext); + let module = AssertUnwindSafe(module); with_externalities_safe( &mut **ext, - move || instance.call(method, call_data), + move || { + RuntimeInstanceSpawn::register_on_externalities(module.clone()); + instance.call_export(method, call_data) + } ) .and_then(|r| r) .map_err(|e| e.to_string()) @@ -269,12 +291,149 @@ impl RuntimeInfo for NativeExecutor { runtime_code, ext, false, - |_instance, version, _ext| + |_module, _instance, version, _ext| Ok(version.cloned().ok_or_else(|| Error::ApiError("Unknown version".into()))), ) } } +/// Helper inner struct to implement `RuntimeSpawn` extension. +pub struct RuntimeInstanceSpawn { + module: Arc, + tasks: parking_lot::Mutex>>>, + counter: AtomicU64, + scheduler: Box, +} + +impl RuntimeSpawn for RuntimeInstanceSpawn { + fn spawn_call(&self, dispatcher_ref: u32, func: u32, data: Vec) -> u64 { + let new_handle = self.counter.fetch_add(1, Ordering::Relaxed); + + let (sender, receiver) = mpsc::channel(); + self.tasks.lock().insert(new_handle, receiver); + + let module = self.module.clone(); + let scheduler = self.scheduler.clone(); + self.scheduler.spawn("executor-extra-runtime-instance", Box::pin(async move { + let module = AssertUnwindSafe(module); + + let async_ext = match new_async_externalities(scheduler.clone()) { + Ok(val) => val, + Err(e) => { + log::error!( + target: "executor", + "Failed to setup externalities for async context: {}", + e, + ); + + // This will drop sender and receiver end will panic + return; + } + }; + + let mut async_ext = match async_ext.with_runtime_spawn( + Box::new(RuntimeInstanceSpawn::new(module.clone(), scheduler)) + ) { + Ok(val) => val, + Err(e) => { + log::error!( + target: "executor", + "Failed to setup runtime extension for async externalities: {}", + e, + ); + + // This will drop sender and receiver end will panic + return; + } + }; + + let result = with_externalities_safe( + &mut async_ext, + move || { + + // FIXME: Should be refactored to shared "instance factory". + // Instantiating wasm here every time is suboptimal at the moment, shared + // pool of instances should be used. + // + // https://github.com/paritytech/substrate/issues/7354 + let instance = module.new_instance() + .expect("Failed to create new instance from module"); + + instance.call( + InvokeMethod::TableWithWrapper { dispatcher_ref, func }, + &data[..], + ).expect("Failed to invoke instance.") + } + ); + + match result { + Ok(output) => { + let _ = sender.send(output); + }, + Err(error) => { + // If execution is panicked, the `join` in the original runtime code will panic as well, + // since the sender is dropped without sending anything. + log::error!("Call error in spawned task: {:?}", error); + }, + } + })); + + + new_handle + } + + fn join(&self, handle: u64) -> Vec { + let receiver = self.tasks.lock().remove(&handle).expect("No task for the handle"); + let output = receiver.recv().expect("Spawned task panicked for the handle"); + output + } +} + +impl RuntimeInstanceSpawn { + pub fn new( + module: Arc, + scheduler: Box, + ) -> Self { + Self { + module, + scheduler, + counter: 0.into(), + tasks: HashMap::new().into(), + } + } + + fn with_externalities_and_module( + module: Arc, + mut ext: &mut dyn Externalities, + ) -> Option { + ext.extension::() + .map(move |task_ext| Self::new(module, task_ext.clone())) + } + + /// Register new `RuntimeSpawnExt` on current externalities. + /// + /// This extensions will spawn instances from provided `module`. + pub fn register_on_externalities(module: Arc) { + sp_externalities::with_externalities( + move |mut ext| { + if let Some(runtime_spawn) = + Self::with_externalities_and_module(module.clone(), ext) + { + if let Err(e) = ext.register_extension( + RuntimeSpawnExt(Box::new(runtime_spawn)) + ) { + trace!( + target: "executor", + "Failed to register `RuntimeSpawnExt` instance on externalities: {:?}", + e, + ) + } + } + } + ); + } +} + impl CodeExecutor for NativeExecutor { type Error = Error; @@ -295,32 +454,34 @@ impl CodeExecutor for NativeExecutor { runtime_code, ext, false, - |instance, onchain_version, mut ext| { + |module, instance, onchain_version, mut ext| { let onchain_version = onchain_version.ok_or_else( || Error::ApiError("Unknown version".into()) )?; + + let can_call_with = onchain_version.can_call_with(&self.native_version.runtime_version); + match ( use_native, - onchain_version.can_call_with(&self.native_version.runtime_version), + can_call_with, native_call, ) { - (_, false, _) => { - trace!( - target: "executor", - "Request for native execution failed (native: {}, chain: {})", - self.native_version.runtime_version, - onchain_version, - ); + (_, false, _) | (false, _, _) => { + if !can_call_with { + trace!( + target: "executor", + "Request for native execution failed (native: {}, chain: {})", + self.native_version.runtime_version, + onchain_version, + ); + } with_externalities_safe( &mut **ext, - move || instance.call(method, data).map(NativeOrEncoded::Encoded) - ) - } - (false, _, _) => { - with_externalities_safe( - &mut **ext, - move || instance.call(method, data).map(NativeOrEncoded::Encoded) + move || { + RuntimeInstanceSpawn::register_on_externalities(module.clone()); + instance.call_export(method, data).map(NativeOrEncoded::Encoded) + } ) }, (true, true, Some(call)) => { diff --git a/client/executor/src/wasm_runtime.rs b/client/executor/src/wasm_runtime.rs index 87a08f714dc93..7288df35f31c4 100644 --- a/client/executor/src/wasm_runtime.rs +++ b/client/executor/src/wasm_runtime.rs @@ -53,7 +53,7 @@ struct VersionedRuntime { /// Wasm runtime type. wasm_method: WasmExecutionMethod, /// Shared runtime that can spawn instances. - module: Box, + module: Arc, /// The number of WebAssembly heap pages this instance was created with. heap_pages: u64, /// Runtime version according to `Core_version` if any. @@ -70,6 +70,7 @@ impl VersionedRuntime { f: F, ) -> Result where F: FnOnce( + &Arc, &dyn WasmInstance, Option<&RuntimeVersion>, &mut dyn Externalities) @@ -87,7 +88,7 @@ impl VersionedRuntime { .map(|r| Ok((r, false))) .unwrap_or_else(|| self.module.new_instance().map(|i| (i, true)))?; - let result = f(&*instance, self.version.as_ref(), ext); + let result = f(&self.module, &*instance, self.version.as_ref(), ext); if let Err(e) = &result { if new_inst { log::warn!( @@ -123,7 +124,7 @@ impl VersionedRuntime { // Allocate a new instance let instance = self.module.new_instance()?; - f(&*instance, self.version.as_ref(), ext) + f(&self.module, &*instance, self.version.as_ref(), ext) } } } @@ -199,6 +200,7 @@ impl RuntimeCache { f: F, ) -> Result, Error> where F: FnOnce( + &Arc, &dyn WasmInstance, Option<&RuntimeVersion>, &mut dyn Externalities) @@ -267,7 +269,7 @@ pub fn create_wasm_runtime_with_code( code: &[u8], host_functions: Vec<&'static dyn Function>, allow_missing_func_imports: bool, -) -> Result, WasmError> { +) -> Result, WasmError> { match wasm_method { WasmExecutionMethod::Interpreted => sc_executor_wasmi::create_runtime( @@ -275,7 +277,7 @@ pub fn create_wasm_runtime_with_code( heap_pages, host_functions, allow_missing_func_imports - ).map(|runtime| -> Box { Box::new(runtime) }), + ).map(|runtime| -> Arc { Arc::new(runtime) }), #[cfg(feature = "wasmtime")] WasmExecutionMethod::Compiled => sc_executor_wasmtime::create_runtime( @@ -283,7 +285,7 @@ pub fn create_wasm_runtime_with_code( heap_pages, host_functions, allow_missing_func_imports - ).map(|runtime| -> Box { Box::new(runtime) }), + ).map(|runtime| -> Arc { Arc::new(runtime) }), } } @@ -318,7 +320,7 @@ fn create_versioned_wasm_runtime( ) -> Result { #[cfg(not(target_os = "unknown"))] let time = std::time::Instant::now(); - let mut runtime = create_wasm_runtime_with_code( + let runtime = create_wasm_runtime_with_code( wasm_method, heap_pages, &code, @@ -333,10 +335,10 @@ fn create_versioned_wasm_runtime( // The following unwind safety assertion is OK because if the method call panics, the // runtime will be dropped. - let runtime = AssertUnwindSafe(runtime.as_mut()); + let runtime = AssertUnwindSafe(runtime.as_ref()); crate::native_executor::with_externalities_safe( &mut **ext, - move || runtime.new_instance()?.call("Core_version", &[]) + move || runtime.new_instance()?.call("Core_version".into(), &[]) ).map_err(|_| WasmError::Instantiation("panic in call to get runtime version".into()))? }; let version = match version_result { diff --git a/client/executor/wasmi/src/lib.rs b/client/executor/wasmi/src/lib.rs index 1632aa3c18ad5..17b92e04950c9 100644 --- a/client/executor/wasmi/src/lib.rs +++ b/client/executor/wasmi/src/lib.rs @@ -19,7 +19,7 @@ use std::{str, cell::RefCell, sync::Arc}; use wasmi::{ Module, ModuleInstance, MemoryInstance, MemoryRef, TableRef, ImportsBuilder, ModuleRef, - memory_units::Pages, + FuncInstance, memory_units::Pages, RuntimeValue::{I32, I64, self}, }; use codec::{Encode, Decode}; @@ -29,7 +29,7 @@ use sp_wasm_interface::{ FunctionContext, Pointer, WordSize, Sandbox, MemoryId, Result as WResult, Function, }; use sp_runtime_interface::unpack_ptr_and_len; -use sc_executor_common::wasm_runtime::{WasmModule, WasmInstance}; +use sc_executor_common::wasm_runtime::{WasmModule, WasmInstance, InvokeMethod}; use sc_executor_common::{ error::{Error, WasmError}, sandbox, @@ -434,7 +434,7 @@ fn get_heap_base(module: &ModuleRef) -> Result { fn call_in_wasm_module( module_instance: &ModuleRef, memory: &MemoryRef, - method: &str, + method: InvokeMethod, data: &[u8], host_functions: &[&'static dyn Function], allow_missing_func_imports: bool, @@ -446,24 +446,49 @@ fn call_in_wasm_module( .and_then(|e| e.as_table().cloned()); let heap_base = get_heap_base(module_instance)?; - let mut fec = FunctionExecutor::new( + let mut function_executor = FunctionExecutor::new( memory.clone(), heap_base, - table, + table.clone(), host_functions, allow_missing_func_imports, missing_functions, )?; // Write the call data - let offset = fec.allocate_memory(data.len() as u32)?; - fec.write_memory(offset, data)?; - - let result = module_instance.invoke_export( - method, - &[I32(u32::from(offset) as i32), I32(data.len() as i32)], - &mut fec, - ); + let offset = function_executor.allocate_memory(data.len() as u32)?; + function_executor.write_memory(offset, data)?; + + let result = match method { + InvokeMethod::Export(method) => { + module_instance.invoke_export( + method, + &[I32(u32::from(offset) as i32), I32(data.len() as i32)], + &mut function_executor, + ) + }, + InvokeMethod::Table(func_ref) => { + let func = table.ok_or(Error::NoTable)? + .get(func_ref)? + .ok_or(Error::NoTableEntryWithIndex(func_ref))?; + FuncInstance::invoke( + &func, + &[I32(u32::from(offset) as i32), I32(data.len() as i32)], + &mut function_executor, + ).map_err(Into::into) + }, + InvokeMethod::TableWithWrapper { dispatcher_ref, func } => { + let dispatcher = table.ok_or(Error::NoTable)? + .get(dispatcher_ref)? + .ok_or(Error::NoTableEntryWithIndex(dispatcher_ref))?; + + FuncInstance::invoke( + &dispatcher, + &[I32(func as _), I32(u32::from(offset) as i32), I32(data.len() as i32)], + &mut function_executor, + ).map_err(Into::into) + }, + }; match result { Ok(Some(I64(r))) => { @@ -474,7 +499,7 @@ fn call_in_wasm_module( trace!( target: "wasm-executor", "Failed to execute code with {} pages", - memory.current_size().0 + memory.current_size().0, ); Err(e.into()) }, @@ -677,7 +702,7 @@ pub struct WasmiInstance { unsafe impl Send for WasmiInstance {} impl WasmInstance for WasmiInstance { - fn call(&self, method: &str, data: &[u8]) -> Result, Error> { + fn call(&self, method: InvokeMethod, data: &[u8]) -> Result, Error> { // We reuse a single wasm instance for multiple calls and a previous call (if any) // altered the state. Therefore, we need to restore the instance to original state. diff --git a/client/executor/wasmtime/src/instance_wrapper.rs b/client/executor/wasmtime/src/instance_wrapper.rs index 9a4e44d3b106c..089d8cb237b56 100644 --- a/client/executor/wasmtime/src/instance_wrapper.rs +++ b/client/executor/wasmtime/src/instance_wrapper.rs @@ -26,6 +26,7 @@ use std::{slice, marker}; use sc_executor_common::{ error::{Error, Result}, util::{WasmModuleInfo, DataSegmentsSnapshot}, + wasm_runtime::InvokeMethod, }; use sp_wasm_interface::{Pointer, WordSize, Value}; use wasmtime::{Engine, Instance, Module, Memory, Table, Val, Func, Extern, Global, Store}; @@ -72,6 +73,82 @@ impl ModuleWrapper { } } +/// Invoked entrypoint format. +pub enum EntryPointType { + /// Direct call. + /// + /// Call is made by providing only payload reference and length. + Direct, + /// Indirect call. + /// + /// Call is made by providing payload reference and length, and extra argument + /// for advanced routing (typically extra WASM function pointer). + Wrapped(u32), +} + +/// Wasm blob entry point. +pub struct EntryPoint { + call_type: EntryPointType, + func: wasmtime::Func, +} + +impl EntryPoint { + /// Call this entry point. + pub fn call(&self, data_ptr: Pointer, data_len: WordSize) -> Result { + let data_ptr = u32::from(data_ptr) as i32; + let data_len = u32::from(data_len) as i32; + + (match self.call_type { + EntryPointType::Direct => { + self.func.call(&[ + wasmtime::Val::I32(data_ptr), + wasmtime::Val::I32(data_len), + ]) + }, + EntryPointType::Wrapped(func) => { + self.func.call(&[ + wasmtime::Val::I32(func as _), + wasmtime::Val::I32(data_ptr), + wasmtime::Val::I32(data_len), + ]) + }, + }) + .map(|results| + // the signature is checked to have i64 return type + results[0].unwrap_i64() as u64 + ) + .map_err(|err| Error::from(format!( + "Wasm execution trapped: {}", + err + ))) + } + + pub fn direct(func: wasmtime::Func) -> std::result::Result { + match (func.ty().params(), func.ty().results()) { + (&[wasmtime::ValType::I32, wasmtime::ValType::I32], &[wasmtime::ValType::I64]) => { + Ok(Self { func, call_type: EntryPointType::Direct }) + } + _ => { + Err("Invalid signature for direct entry point") + } + } + } + + pub fn wrapped(dispatcher: wasmtime::Func, func: u32) -> std::result::Result { + match (dispatcher.ty().params(), dispatcher.ty().results()) { + ( + &[wasmtime::ValType::I32, wasmtime::ValType::I32, wasmtime::ValType::I32], + &[wasmtime::ValType::I64], + ) => { + Ok(Self { func: dispatcher, call_type: EntryPointType::Wrapped(func) }) + }, + _ => { + Err("Invalid signature for wrapped entry point") + } + } + } +} + /// Wrap the given WebAssembly Instance of a wasm module with Substrate-runtime. /// /// This struct is a handy wrapper around a wasmtime `Instance` that provides substrate specific @@ -150,24 +227,62 @@ impl InstanceWrapper { /// /// An entrypoint must have a signature `(i32, i32) -> i64`, otherwise this function will return /// an error. - pub fn resolve_entrypoint(&self, name: &str) -> Result { - // Resolve the requested method and verify that it has a proper signature. - let export = self - .instance - .get_export(name) - .ok_or_else(|| Error::from(format!("Exported method {} is not found", name)))?; - let entrypoint = extern_func(&export) - .ok_or_else(|| Error::from(format!("Export {} is not a function", name)))?; - match (entrypoint.ty().params(), entrypoint.ty().results()) { - (&[wasmtime::ValType::I32, wasmtime::ValType::I32], &[wasmtime::ValType::I64]) => {} - _ => { - return Err(Error::from(format!( - "method {} have an unsupported signature", - name - ))) - } - } - Ok(entrypoint.clone()) + pub fn resolve_entrypoint(&self, method: InvokeMethod) -> Result { + Ok(match method { + InvokeMethod::Export(method) => { + // Resolve the requested method and verify that it has a proper signature. + let export = self + .instance + .get_export(method) + .ok_or_else(|| Error::from(format!("Exported method {} is not found", method)))?; + let func = extern_func(&export) + .ok_or_else(|| Error::from(format!("Export {} is not a function", method)))? + .clone(); + EntryPoint::direct(func) + .map_err(|_| + Error::from(format!( + "Exported function '{}' has invalid signature.", + method, + )) + )? + }, + InvokeMethod::Table(func_ref) => { + let table = self.instance.get_table("__indirect_function_table").ok_or(Error::NoTable)?; + let val = table.get(func_ref) + .ok_or(Error::NoTableEntryWithIndex(func_ref))?; + let func = val + .funcref() + .ok_or(Error::TableElementIsNotAFunction(func_ref))? + .ok_or(Error::FunctionRefIsNull(func_ref))? + .clone(); + + EntryPoint::direct(func) + .map_err(|_| + Error::from(format!( + "Function @{} in exported table has invalid signature for direct call.", + func_ref, + )) + )? + }, + InvokeMethod::TableWithWrapper { dispatcher_ref, func } => { + let table = self.instance.get_table("__indirect_function_table").ok_or(Error::NoTable)?; + let val = table.get(dispatcher_ref) + .ok_or(Error::NoTableEntryWithIndex(dispatcher_ref))?; + let dispatcher = val + .funcref() + .ok_or(Error::TableElementIsNotAFunction(dispatcher_ref))? + .ok_or(Error::FunctionRefIsNull(dispatcher_ref))? + .clone(); + + EntryPoint::wrapped(dispatcher, func) + .map_err(|_| + Error::from(format!( + "Function @{} in exported table has invalid signature for wrapped call.", + dispatcher_ref, + )) + )? + }, + }) } /// Returns an indirect function table of this instance. diff --git a/client/executor/wasmtime/src/runtime.rs b/client/executor/wasmtime/src/runtime.rs index 365770b3fa866..965b067535721 100644 --- a/client/executor/wasmtime/src/runtime.rs +++ b/client/executor/wasmtime/src/runtime.rs @@ -18,14 +18,14 @@ use crate::host::HostState; use crate::imports::{Imports, resolve_imports}; -use crate::instance_wrapper::{ModuleWrapper, InstanceWrapper, GlobalsSnapshot}; +use crate::instance_wrapper::{ModuleWrapper, InstanceWrapper, GlobalsSnapshot, EntryPoint}; use crate::state_holder; use std::rc::Rc; use std::sync::Arc; use sc_executor_common::{ - error::{Error, Result, WasmError}, - wasm_runtime::{WasmModule, WasmInstance}, + error::{Result, WasmError}, + wasm_runtime::{WasmModule, WasmInstance, InvokeMethod}, }; use sp_allocator::FreeingBumpHeapAllocator; use sp_runtime_interface::unpack_ptr_and_len; @@ -90,7 +90,7 @@ pub struct WasmtimeInstance { unsafe impl Send for WasmtimeInstance {} impl WasmInstance for WasmtimeInstance { - fn call(&self, method: &str, data: &[u8]) -> Result> { + fn call(&self, method: InvokeMethod, data: &[u8]) -> Result> { let entrypoint = self.instance_wrapper.resolve_entrypoint(method)?; let allocator = FreeingBumpHeapAllocator::new(self.heap_base); @@ -146,28 +146,14 @@ pub fn create_runtime( fn perform_call( data: &[u8], instance_wrapper: Rc, - entrypoint: wasmtime::Func, + entrypoint: EntryPoint, mut allocator: FreeingBumpHeapAllocator, ) -> Result> { let (data_ptr, data_len) = inject_input_data(&instance_wrapper, &mut allocator, data)?; let host_state = HostState::new(allocator, instance_wrapper.clone()); - let ret = state_holder::with_initialized_state(&host_state, || { - match entrypoint.call(&[ - wasmtime::Val::I32(u32::from(data_ptr) as i32), - wasmtime::Val::I32(u32::from(data_len) as i32), - ]) { - Ok(results) => { - let retval = results[0].unwrap_i64() as u64; - Ok(unpack_ptr_and_len(retval)) - } - Err(trap) => { - return Err(Error::from(format!( - "Wasm execution trapped: {}", - trap - ))); - } - } + let ret = state_holder::with_initialized_state(&host_state, || -> Result<_> { + Ok(unpack_ptr_and_len(entrypoint.call(data_ptr, data_len)?)) }); let (output_ptr, output_len) = ret?; let output = extract_output_data(&instance_wrapper, output_ptr, output_len)?; diff --git a/frame/example-parallel/Cargo.toml b/frame/example-parallel/Cargo.toml new file mode 100644 index 0000000000000..01a612fb82fbf --- /dev/null +++ b/frame/example-parallel/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "pallet-example-parallel" +version = "2.0.0" +authors = ["Parity Technologies "] +edition = "2018" +license = "Unlicense" +homepage = "https://substrate.dev" +repository = "https://github.com/paritytech/substrate/" +description = "FRAME example pallet using runtime worker threads" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] + +[dependencies] +codec = { package = "parity-scale-codec", version = "1.3.4", default-features = false } +frame-support = { version = "2.0.0", default-features = false, path = "../support" } +frame-system = { version = "2.0.0", default-features = false, path = "../system" } +sp-core = { version = "2.0.0", default-features = false, path = "../../primitives/core" } +sp-io = { version = "2.0.0", default-features = false, path = "../../primitives/io" } +sp-runtime = { version = "2.0.0", default-features = false, path = "../../primitives/runtime" } +sp-std = { version = "2.0.0", default-features = false, path = "../../primitives/std" } +sp-tasks = { version = "2.0.0", default-features = false, path = "../../primitives/tasks" } + +[features] +default = ["std"] +std = [ + "codec/std", + "frame-support/std", + "frame-system/std", + "sp-core/std", + "sp-io/std", + "sp-runtime/std", + "sp-std/std", + "sp-tasks/std", +] diff --git a/frame/example-parallel/src/lib.rs b/frame/example-parallel/src/lib.rs new file mode 100644 index 0000000000000..4b7ce72b4d40e --- /dev/null +++ b/frame/example-parallel/src/lib.rs @@ -0,0 +1,152 @@ +// This file is part of Substrate. + +// Copyright (C) 2020 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: Apache-2.0 + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Parallel tasks example +//! +//! This example pallet parallelizes validation of the enlisted participants +//! (see `enlist_participants` dispatch). + +#![cfg_attr(not(feature = "std"), no_std)] + +use frame_system::ensure_signed; +use frame_support::{ + dispatch::DispatchResult, decl_module, decl_storage, decl_event, +}; +use sp_runtime::RuntimeDebug; + +use codec::{Encode, Decode}; +use sp_std::vec::Vec; + +#[cfg(test)] +mod tests; + +pub trait Trait: frame_system::Trait { + /// The overarching event type. + type Event: From + Into<::Event>; + /// The overarching dispatch call type. + type Call: From>; +} + +decl_storage! { + trait Store for Module as ExampleOffchainWorker { + /// A vector of current participants + /// + /// To enlist someone to participate, signed payload should be + /// sent to `enlist`. + Participants get(fn participants): Vec>; + + /// Current event id to enlist participants to. + CurrentEventId get(fn get_current_event_id): Vec; + } +} + +decl_event!( + /// Events generated by the module. + pub enum Event { + /// When new event is drafted. + NewEventDrafted(Vec), + } +); + +/// Request to enlist participant. +#[derive(Encode, Decode, Clone, PartialEq, Eq, RuntimeDebug)] +pub struct EnlistedParticipant { + pub account: Vec, + pub signature: Vec, +} + +impl EnlistedParticipant { + fn verify(&self, event_id: &[u8]) -> bool { + use sp_core::Public; + use std::convert::TryFrom; + use sp_runtime::traits::Verify; + + match sp_core::sr25519::Signature::try_from(&self.signature[..]) { + Ok(signature) => { + let public = sp_core::sr25519::Public::from_slice(self.account.as_ref()); + signature.verify(event_id, &public) + } + _ => false + } + } +} + +decl_module! { + /// A public part of the pallet. + pub struct Module for enum Call where origin: T::Origin { + fn deposit_event() = default; + + /// Get the new event running. + #[weight = 0] + pub fn run_event(origin, id: Vec) -> DispatchResult { + let _ = ensure_signed(origin)?; + Participants::kill(); + CurrentEventId::mutate(move |event_id| *event_id = id); + Ok(()) + } + + /// Submit list of participants to the current event. + /// + /// The example utilizes parallel execution by checking half of the + /// signatures in spawned task. + #[weight = 0] + pub fn enlist_participants(origin, participants: Vec) + -> DispatchResult + { + let _ = ensure_signed(origin)?; + + if validate_participants_parallel(&CurrentEventId::get(), &participants[..]) { + for participant in participants { + Participants::append(participant.account); + } + } + Ok(()) + } + } +} + +fn validate_participants_parallel(event_id: &[u8], participants: &[EnlistedParticipant]) -> bool { + + fn spawn_verify(data: Vec) -> Vec { + let stream = &mut &data[..]; + let event_id = Vec::::decode(stream).expect("Failed to decode"); + let participants = Vec::::decode(stream).expect("Failed to decode"); + + for participant in participants { + if !participant.verify(&event_id) { + return false.encode() + } + } + true.encode() + } + + let mut async_payload = Vec::new(); + event_id.encode_to(&mut async_payload); + participants[..participants.len() / 2].encode_to(&mut async_payload); + + let handle = sp_tasks::spawn(spawn_verify, async_payload); + let mut result = true; + + for participant in &participants[participants.len()/2+1..] { + if !participant.verify(event_id) { + result = false; + break; + } + } + + bool::decode(&mut &handle.join()[..]).expect("Failed to decode result") && result +} diff --git a/frame/example-parallel/src/tests.rs b/frame/example-parallel/src/tests.rs new file mode 100644 index 0000000000000..1da8c60388266 --- /dev/null +++ b/frame/example-parallel/src/tests.rs @@ -0,0 +1,151 @@ +// This file is part of Substrate. + +// Copyright (C) 2020 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: Apache-2.0 + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::*; + +use codec::{Encode, Decode}; +use frame_support::{impl_outer_origin, parameter_types, weights::Weight}; +use sp_core::H256; +use sp_runtime::{ + Perbill, + testing::{Header}, + traits::{BlakeTwo256, IdentityLookup}, +}; + +impl_outer_origin! { + pub enum Origin for Test where system = frame_system {} +} + +#[derive(Clone, Eq, PartialEq, Encode, Decode)] +pub struct Test; +parameter_types! { + pub const BlockHashCount: u64 = 250; + pub const MaximumBlockWeight: Weight = 1024; + pub const MaximumBlockLength: u32 = 2 * 1024; + pub const AvailableBlockRatio: Perbill = Perbill::one(); +} + +impl frame_system::Trait for Test { + type BaseCallFilter = (); + type Origin = Origin; + type Call = (); + type PalletInfo = (); + type Index = u64; + type BlockNumber = u64; + type Hash = H256; + type Hashing = BlakeTwo256; + type AccountId = sp_core::sr25519::Public; + type Lookup = IdentityLookup; + type Header = Header; + type Event = (); + type BlockHashCount = BlockHashCount; + type MaximumBlockWeight = MaximumBlockWeight; + type DbWeight = (); + type BlockExecutionWeight = (); + type ExtrinsicBaseWeight = (); + type MaximumExtrinsicWeight = MaximumBlockWeight; + type MaximumBlockLength = MaximumBlockLength; + type AvailableBlockRatio = AvailableBlockRatio; + type Version = (); + type AccountData = (); + type OnNewAccount = (); + type OnKilledAccount = (); + type SystemWeightInfo = (); +} + +parameter_types! { + pub const GracePeriod: u64 = 5; + pub const UnsignedInterval: u64 = 128; + pub const UnsignedPriority: u64 = 1 << 20; +} + +impl Trait for Test { + type Event = (); + type Call = Call; +} + +type Example = Module; + +#[test] +fn it_can_enlist() { + use sp_core::Pair; + + sp_io::TestExternalities::default().execute_with(|| { + let (pair1, _) = sp_core::sr25519::Pair::generate(); + let (pair2, _) = sp_core::sr25519::Pair::generate(); + + let event_name = b"test"; + + Example::run_event(Origin::signed(Default::default()), event_name.to_vec()) + .expect("Failed to enlist"); + + let participants = vec![ + EnlistedParticipant { + account: pair1.public().to_vec(), + signature: AsRef::<[u8]>::as_ref(&pair1.sign(event_name)).to_vec(), + }, + EnlistedParticipant { + account: pair2.public().to_vec(), + signature: AsRef::<[u8]>::as_ref(&pair2.sign(event_name)).to_vec(), + }, + ]; + + Example::enlist_participants(Origin::signed(Default::default()), participants) + .expect("Failed to enlist"); + + assert_eq!(Example::participants().len(), 2); + }); + +} + +#[test] +fn one_wrong_will_not_enlist_anyone() { + use sp_core::Pair; + + sp_io::TestExternalities::default().execute_with(|| { + let (pair1, _) = sp_core::sr25519::Pair::generate(); + let (pair2, _) = sp_core::sr25519::Pair::generate(); + let (pair3, _) = sp_core::sr25519::Pair::generate(); + + let event_name = b"test"; + + Example::run_event(Origin::signed(Default::default()), event_name.to_vec()) + .expect("Failed to enlist"); + + let participants = vec![ + EnlistedParticipant { + account: pair1.public().to_vec(), + signature: AsRef::<[u8]>::as_ref(&pair1.sign(event_name)).to_vec(), + }, + EnlistedParticipant { + account: pair2.public().to_vec(), + signature: AsRef::<[u8]>::as_ref(&pair2.sign(event_name)).to_vec(), + }, + // signing wrong event + EnlistedParticipant { + account: pair3.public().to_vec(), + signature: AsRef::<[u8]>::as_ref(&pair3.sign(&[])).to_vec(), + }, + ]; + + Example::enlist_participants(Origin::signed(Default::default()), participants) + .expect("Failed to enlist"); + + assert_eq!(Example::participants().len(), 0); + }); + +} diff --git a/primitives/core/src/traits.rs b/primitives/core/src/traits.rs index 453bca26f7113..406dba533899b 100644 --- a/primitives/core/src/traits.rs +++ b/primitives/core/src/traits.rs @@ -186,6 +186,25 @@ impl TaskExecutorExt { } } +/// Runtime spawn extension. +pub trait RuntimeSpawn: Send { + /// Create new runtime instance and use dynamic dispatch to invoke with specified payload. + /// + /// Returns handle of the spawned task. + /// + /// Function pointers (`dispatcher_ref`, `func`) are WASM pointer types. + fn spawn_call(&self, dispatcher_ref: u32, func: u32, payload: Vec) -> u64; + + /// Join the result of previously created runtime instance invocation. + fn join(&self, handle: u64) -> Vec; +} + +#[cfg(feature = "std")] +sp_externalities::decl_extension! { + /// Extension that supports spawning extra runtime instances in externalities. + pub struct RuntimeSpawnExt(Box); +} + /// Something that can spawn futures (blocking and non-blocking) with an assigned name. #[dyn_clonable::clonable] pub trait SpawnNamed: Clone + Send + Sync { diff --git a/primitives/externalities/src/extensions.rs b/primitives/externalities/src/extensions.rs index ee45148487d21..a7f5ee8bc739e 100644 --- a/primitives/externalities/src/extensions.rs +++ b/primitives/externalities/src/extensions.rs @@ -79,6 +79,12 @@ macro_rules! decl_extension { &mut self.0 } } + + impl From<$inner> for $ext_name { + fn from(inner: $inner) -> Self { + Self(inner) + } + } } } diff --git a/primitives/io/src/lib.rs b/primitives/io/src/lib.rs index 6a3bb472771c0..382a0c4b3bd6a 100644 --- a/primitives/io/src/lib.rs +++ b/primitives/io/src/lib.rs @@ -38,7 +38,7 @@ use tracing; #[cfg(feature = "std")] use sp_core::{ crypto::Pair, - traits::{CallInWasmExt, TaskExecutorExt}, + traits::{CallInWasmExt, TaskExecutorExt, RuntimeSpawnExt}, offchain::{OffchainExt, TransactionPoolExt}, hexdisplay::HexDisplay, storage::ChildInfo, @@ -520,7 +520,6 @@ pub trait Crypto { fn start_batch_verify(&mut self) { let scheduler = self.extension::() .expect("No task executor associated with the current context!") - .0 .clone(); self.register_extension(VerificationExt(BatchVerifier::new(scheduler))) @@ -755,7 +754,7 @@ pub trait OffchainIndex { #[cfg(feature = "std")] sp_externalities::decl_extension! { - /// The keystore extension to register/retrieve from the externalities. + /// Batch verification extension to register/retrieve from the externalities. pub struct VerificationExt(BatchVerifier); } @@ -1239,6 +1238,34 @@ pub trait Sandbox { } } +/// Wasm host functions for managing tasks. +/// +/// This should not be used directly. Use `sp_tasks` for running parallel tasks instead. +#[runtime_interface(wasm_only)] +pub trait RuntimeTasks { + /// Wasm host function for spawning task. + /// + /// This should not be used directly. Use `sp_tasks::spawn` instead. + fn spawn(dispatcher_ref: u32, entry: u32, payload: Vec) -> u64 { + sp_externalities::with_externalities(|mut ext|{ + let runtime_spawn = ext.extension::() + .expect("Cannot spawn without dynamic runtime dispatcher (RuntimeSpawnExt)"); + runtime_spawn.spawn_call(dispatcher_ref, entry, payload) + }).expect("`RuntimeTasks::spawn`: called outside of externalities context") + } + + /// Wasm host function for joining a task. + /// + /// This should not be used directly. Use `join` of `sp_tasks::spawn` result instead. + fn join(handle: u64) -> Vec { + sp_externalities::with_externalities(|mut ext| { + let runtime_spawn = ext.extension::() + .expect("Cannot join without dynamic runtime dispatcher (RuntimeSpawnExt)"); + runtime_spawn.join(handle) + }).expect("`RuntimeTasks::join`: called outside of externalities context") + } + } + /// Allocator used by Substrate when executing the Wasm runtime. #[cfg(not(feature = "std"))] struct WasmAllocator; @@ -1306,6 +1333,7 @@ pub type SubstrateHostFunctions = ( sandbox::HostFunctions, crate::trie::HostFunctions, offchain_index::HostFunctions, + runtime_tasks::HostFunctions, ); #[cfg(test)] diff --git a/primitives/runtime-interface/src/lib.rs b/primitives/runtime-interface/src/lib.rs index 2273e453f104a..7ff5f0d7a042d 100644 --- a/primitives/runtime-interface/src/lib.rs +++ b/primitives/runtime-interface/src/lib.rs @@ -312,7 +312,7 @@ pub mod pass_by; mod util; -pub use util::unpack_ptr_and_len; +pub use util::{unpack_ptr_and_len, pack_ptr_and_len}; /// Something that can be used by the runtime interface as type to communicate between wasm and the /// host. diff --git a/primitives/state-machine/src/basic.rs b/primitives/state-machine/src/basic.rs index 76d53659db64b..5e3c9bed64f10 100644 --- a/primitives/state-machine/src/basic.rs +++ b/primitives/state-machine/src/basic.rs @@ -18,7 +18,7 @@ //! Basic implementation for Externalities. use std::{ - collections::BTreeMap, any::{TypeId, Any}, iter::FromIterator, ops::Bound + collections::BTreeMap, any::{TypeId, Any}, iter::FromIterator, ops::Bound, }; use crate::{Backend, StorageKey, StorageValue}; use hash_db::Hasher; diff --git a/primitives/state-machine/src/testing.rs b/primitives/state-machine/src/testing.rs index de68d7e415cdd..4dcd308285625 100644 --- a/primitives/state-machine/src/testing.rs +++ b/primitives/state-machine/src/testing.rs @@ -17,9 +17,8 @@ //! Test implementation for Externalities. -use std::any::{Any, TypeId}; -use codec::Decode; -use hash_db::Hasher; +use std::{any::{Any, TypeId}, panic::{AssertUnwindSafe, UnwindSafe}}; + use crate::{ backend::Backend, OverlayedChanges, StorageTransactionCache, ext::Ext, InMemoryBackend, StorageKey, StorageValue, @@ -30,6 +29,9 @@ use crate::{ State as ChangesTrieState, }, }; + +use codec::{Decode, Encode}; +use hash_db::Hasher; use sp_core::{ offchain::{ testing::TestPersistentOffchainDB, @@ -42,7 +44,6 @@ use sp_core::{ traits::TaskExecutorExt, testing::TaskExecutor, }; -use codec::Encode; use sp_externalities::{Extensions, Extension}; /// Simple HashMap-based Externalities impl. @@ -178,6 +179,19 @@ impl TestExternalities let mut ext = self.ext(); sp_externalities::set_and_run_with_externalities(&mut ext, execute) } + + /// Execute the given closure while `self` is set as externalities. + /// + /// Returns the result of the given closure, if no panics occured. + /// Otherwise, returns `Err`. + pub fn execute_with_safe(&mut self, f: impl FnOnce() -> R + UnwindSafe) -> Result { + let mut ext = AssertUnwindSafe(self.ext()); + std::panic::catch_unwind(move || + sp_externalities::set_and_run_with_externalities(&mut *ext, f) + ).map_err(|e| { + format!("Closure panicked: {:?}", e) + }) + } } impl std::fmt::Debug for TestExternalities diff --git a/primitives/tasks/Cargo.toml b/primitives/tasks/Cargo.toml new file mode 100644 index 0000000000000..0c0f410824c81 --- /dev/null +++ b/primitives/tasks/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "sp-tasks" +version = "2.0.0" +authors = ["Parity Technologies "] +edition = "2018" +license = "Apache-2.0" +homepage = "https://substrate.dev" +repository = "https://github.com/paritytech/substrate/" +description = "Runtime asynchronous, pure computational tasks" +documentation = "https://docs.rs/sp-tasks" +readme = "README.md" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] + +[dependencies] +log = { version = "0.4.8", optional = true } +sp-core = { version = "2.0.0", default-features = false, path = "../core" } +sp-externalities = { version = "0.8.0", optional = true, path = "../externalities" } +sp-io = { version = "2.0.0", default-features = false, path = "../io" } +sp-runtime-interface = { version = "2.0.0", default-features = false, path = "../runtime-interface" } +sp-std = { version = "2.0.0", default-features = false, path = "../std" } + +[dev-dependencies] +codec = { package = "parity-scale-codec", default-features = false, version = "1.3.1" } + +[features] +default = ["std"] +std = [ + "log", + "sp-core/std", + "sp-externalities", + "sp-io/std", + "sp-runtime-interface/std", + "sp-std/std", +] diff --git a/primitives/tasks/README.md b/primitives/tasks/README.md new file mode 100644 index 0000000000000..1235e1bd933d4 --- /dev/null +++ b/primitives/tasks/README.md @@ -0,0 +1,3 @@ +Runtime asynchronous, pure computational tasks. + +License: Apache-2.0 \ No newline at end of file diff --git a/primitives/tasks/src/async_externalities.rs b/primitives/tasks/src/async_externalities.rs new file mode 100644 index 0000000000000..8994d069e4c76 --- /dev/null +++ b/primitives/tasks/src/async_externalities.rs @@ -0,0 +1,215 @@ +// This file is part of Substrate. + +// Copyright (C) 2020 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Async externalities. + +use std::any::{TypeId, Any}; +use sp_core::{ + storage::{ChildInfo, TrackedStorageKey}, + traits::{Externalities, SpawnNamed, TaskExecutorExt, RuntimeSpawnExt, RuntimeSpawn}, +}; +use sp_externalities::{Extensions, ExternalitiesExt as _}; + +/// Simple state-less externalities for use in async context. +/// +/// Will panic if anything is accessing the storage. +#[derive(Debug)] +pub struct AsyncExternalities { + extensions: Extensions, +} + +/// New Async externalities. +pub fn new_async_externalities(scheduler: Box) -> Result { + let mut res = AsyncExternalities { extensions: Default::default() }; + let mut ext = &mut res as &mut dyn Externalities; + ext.register_extension::(TaskExecutorExt(scheduler.clone())) + .map_err(|_| "Failed to register task executor extension.")?; + + Ok(res) +} + +impl AsyncExternalities { + /// Extend async externalities with the ability to spawn wasm instances. + pub fn with_runtime_spawn( + mut self, + runtime_ext: Box, + ) -> Result { + let mut ext = &mut self as &mut dyn Externalities; + ext.register_extension::(RuntimeSpawnExt(runtime_ext)) + .map_err(|_| "Failed to register task executor extension.")?; + + Ok(self) + } +} + +type StorageKey = Vec; + +type StorageValue = Vec; + +impl Externalities for AsyncExternalities { + fn set_offchain_storage(&mut self, _key: &[u8], _value: Option<&[u8]>) { + panic!("`set_offchain_storage`: should not be used in async externalities!") + } + + fn storage(&self, _key: &[u8]) -> Option { + panic!("`storage`: should not be used in async externalities!") + } + + fn storage_hash(&self, _key: &[u8]) -> Option> { + panic!("`storage_hash`: should not be used in async externalities!") + } + + fn child_storage( + &self, + _child_info: &ChildInfo, + _key: &[u8], + ) -> Option { + panic!("`child_storage`: should not be used in async externalities!") + } + + fn child_storage_hash( + &self, + _child_info: &ChildInfo, + _key: &[u8], + ) -> Option> { + panic!("`child_storage_hash`: should not be used in async externalities!") + } + + fn next_storage_key(&self, _key: &[u8]) -> Option { + panic!("`next_storage_key`: should not be used in async externalities!") + } + + fn next_child_storage_key( + &self, + _child_info: &ChildInfo, + _key: &[u8], + ) -> Option { + panic!("`next_child_storage_key`: should not be used in async externalities!") + } + + fn place_storage(&mut self, _key: StorageKey, _maybe_value: Option) { + panic!("`place_storage`: should not be used in async externalities!") + } + + fn place_child_storage( + &mut self, + _child_info: &ChildInfo, + _key: StorageKey, + _value: Option, + ) { + panic!("`place_child_storage`: should not be used in async externalities!") + } + + fn kill_child_storage( + &mut self, + _child_info: &ChildInfo, + ) { + panic!("`kill_child_storage`: should not be used in async externalities!") + } + + fn clear_prefix(&mut self, _prefix: &[u8]) { + panic!("`clear_prefix`: should not be used in async externalities!") + } + + fn clear_child_prefix( + &mut self, + _child_info: &ChildInfo, + _prefix: &[u8], + ) { + panic!("`clear_child_prefix`: should not be used in async externalities!") + } + + fn storage_append( + &mut self, + _key: Vec, + _value: Vec, + ) { + panic!("`storage_append`: should not be used in async externalities!") + } + + fn chain_id(&self) -> u64 { 42 } + + fn storage_root(&mut self) -> Vec { + panic!("`storage_root`: should not be used in async externalities!") + } + + fn child_storage_root( + &mut self, + _child_info: &ChildInfo, + ) -> Vec { + panic!("`child_storage_root`: should not be used in async externalities!") + } + + fn storage_changes_root(&mut self, _parent: &[u8]) -> Result>, ()> { + panic!("`storage_changes_root`: should not be used in async externalities!") + } + + fn storage_start_transaction(&mut self) { + unimplemented!("Transactions are not supported by AsyncExternalities"); + } + + fn storage_rollback_transaction(&mut self) -> Result<(), ()> { + unimplemented!("Transactions are not supported by AsyncExternalities"); + } + + fn storage_commit_transaction(&mut self) -> Result<(), ()> { + unimplemented!("Transactions are not supported by AsyncExternalities"); + } + + fn wipe(&mut self) {} + + fn commit(&mut self) {} + + fn read_write_count(&self) -> (u32, u32, u32, u32) { + unimplemented!("read_write_count is not supported in AsyncExternalities") + } + + fn reset_read_write_count(&mut self) { + unimplemented!("reset_read_write_count is not supported in AsyncExternalities") + } + + fn get_whitelist(&self) -> Vec { + unimplemented!("get_whitelist is not supported in AsyncExternalities") + } + + fn set_whitelist(&mut self, _: Vec) { + unimplemented!("set_whitelist is not supported in AsyncExternalities") + } +} + +impl sp_externalities::ExtensionStore for AsyncExternalities { + fn extension_by_type_id(&mut self, type_id: TypeId) -> Option<&mut dyn Any> { + self.extensions.get_mut(type_id) + } + + fn register_extension_with_type_id( + &mut self, + type_id: TypeId, + extension: Box, + ) -> Result<(), sp_externalities::Error> { + self.extensions.register_with_type_id(type_id, extension) + } + + fn deregister_extension_by_type_id(&mut self, type_id: TypeId) -> Result<(), sp_externalities::Error> { + if self.extensions.deregister(type_id) { + Ok(()) + } else { + Err(sp_externalities::Error::ExtensionIsNotRegistered(type_id)) + } + } +} diff --git a/primitives/tasks/src/lib.rs b/primitives/tasks/src/lib.rs new file mode 100644 index 0000000000000..030e178109d7b --- /dev/null +++ b/primitives/tasks/src/lib.rs @@ -0,0 +1,247 @@ +// This file is part of Substrate. + +// Copyright (C) 2020 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: Apache-2.0 + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Runtime tasks. +//! +//! Contains runtime-usable functions for spawning parallel purely computational tasks. +//! +//! NOTE: This is experimental API. +//! NOTE: When using in actual runtime, make sure you don't produce unbounded parallelism. +//! So this is bad example to use it: +//! ```rust +//! fn my_parallel_computator(data: Vec) -> Vec { +//! unimplemented!() +//! } +//! fn test(dynamic_variable: i32) { +//! for _ in 0..dynamic_variable { sp_tasks::spawn(my_parallel_computator, vec![]); } +//! } +//! ``` +//! +//! While this is a good example: +//! ```rust +//! use codec::Encode; +//! static STATIC_VARIABLE: i32 = 4; +//! +//! fn my_parallel_computator(data: Vec) -> Vec { +//! unimplemented!() +//! } +//! +//! fn test(computation_payload: Vec) { +//! let parallel_tasks = (0..STATIC_VARIABLE).map(|idx| +//! sp_tasks::spawn(my_parallel_computator, computation_payload.chunks(10).nth(idx as _).encode()) +//! ); +//! } +//! ``` +//! +//! When allowing unbounded parallelism, malicious transactions can exploit it and partition +//! network consensus based on how much resources nodes have. +//! + +#![cfg_attr(not(feature = "std"), no_std)] + +#[cfg(feature = "std")] +mod async_externalities; + +#[cfg(feature = "std")] +pub use async_externalities::{new_async_externalities, AsyncExternalities}; + +#[cfg(feature = "std")] +mod inner { + use std::{panic::AssertUnwindSafe, sync::mpsc}; + use sp_externalities::ExternalitiesExt as _; + use sp_core::traits::TaskExecutorExt; + + /// Task handle (wasm). + /// + /// This can be `join`-ed to get (blocking) the result of + /// the spawned task execution. + #[must_use] + pub struct DataJoinHandle { + receiver: mpsc::Receiver>, + } + + impl DataJoinHandle { + /// Join handle returned by `spawn` function + pub fn join(self) -> Vec { + self.receiver.recv().expect("Spawned runtime task terminated before sending result.") + } + } + + /// Spawn new runtime task (native). + pub fn spawn(entry_point: fn(Vec) -> Vec, data: Vec) -> DataJoinHandle { + let scheduler = sp_externalities::with_externalities(|mut ext| ext.extension::() + .expect("No task executor associated with the current context!") + .clone() + ).expect("Spawn called outside of externalities context!"); + + let (sender, receiver) = mpsc::channel(); + let extra_scheduler = scheduler.clone(); + scheduler.spawn("parallel-runtime-spawn", Box::pin(async move { + let result = match crate::new_async_externalities(extra_scheduler) { + Ok(mut ext) => { + let mut ext = AssertUnwindSafe(&mut ext); + match std::panic::catch_unwind(move || { + sp_externalities::set_and_run_with_externalities( + &mut **ext, + move || entry_point(data), + ) + }) { + Ok(result) => result, + Err(panic) => { + log::error!( + target: "runtime", + "Spawned task panicked: {:?}", + panic, + ); + + // This will drop sender without sending anything. + return; + } + } + }, + Err(e) => { + log::error!( + target: "runtime", + "Unable to run async task: {}", + e, + ); + + return; + }, + }; + + let _ = sender.send(result); + })); + + DataJoinHandle { receiver } + } +} + +#[cfg(not(feature = "std"))] +mod inner { + use core::mem; + use sp_std::prelude::*; + + /// Dispatch wrapper for wasm blob. + /// + /// Serves as trampoline to call any rust function with (Vec) -> Vec compiled + /// into the runtime. + /// + /// Function item should be provided with `func_ref`. Argument for the call + /// will be generated from bytes at `payload_ptr` with `payload_len`. + /// + /// NOTE: Since this dynamic dispatch function and the invoked function are compiled with + /// the same compiler, there should be no problem with ABI incompatibility. + extern "C" fn dispatch_wrapper(func_ref: *const u8, payload_ptr: *mut u8, payload_len: u32) -> u64 { + let payload_len = payload_len as usize; + let output = unsafe { + let payload = Vec::from_raw_parts(payload_ptr, payload_len, payload_len); + let ptr: fn(Vec) -> Vec = mem::transmute(func_ref); + (ptr)(payload) + }; + sp_runtime_interface::pack_ptr_and_len(output.as_ptr() as usize as _, output.len() as _) + } + + /// Spawn new runtime task (wasm). + pub fn spawn(entry_point: fn(Vec) -> Vec, payload: Vec) -> DataJoinHandle { + let func_ptr: usize = unsafe { mem::transmute(entry_point) }; + + let handle = sp_io::runtime_tasks::spawn( + dispatch_wrapper as usize as _, + func_ptr as u32, + payload, + ); + DataJoinHandle { handle } + } + + /// Task handle (wasm). + /// + /// This can be `join`-ed to get (blocking) the result of + /// the spawned task execution. + #[must_use] + pub struct DataJoinHandle { + handle: u64, + } + + impl DataJoinHandle { + /// Join handle returned by `spawn` function + pub fn join(self) -> Vec { + sp_io::runtime_tasks::join(self.handle) + } + } +} + +pub use inner::{DataJoinHandle, spawn}; + +#[cfg(test)] +mod tests { + + use super::*; + + fn async_runner(mut data: Vec) -> Vec { + data.sort(); + data + } + + fn async_panicker(_data: Vec) -> Vec { + panic!("panic in async panicker!") + } + + #[test] + fn basic() { + sp_io::TestExternalities::default().execute_with(|| { + let a1 = spawn(async_runner, vec![5, 2, 1]).join(); + assert_eq!(a1, vec![1, 2, 5]); + }) + } + + #[test] + fn panicking() { + let res = sp_io::TestExternalities::default().execute_with_safe(||{ + spawn(async_panicker, vec![5, 2, 1]).join(); + }); + + assert!(res.unwrap_err().contains("Closure panicked")); + } + + #[test] + fn many_joins() { + sp_io::TestExternalities::default().execute_with_safe(|| { + // converges to 1 only after 1000+ steps + let mut running_val = 9780657630u64; + let mut data = vec![]; + let handles = (0..1024).map( + |_| { + running_val = if running_val % 2 == 0 { + running_val / 2 + } else { + 3 * running_val + 1 + }; + data.push(running_val as u8); + (spawn(async_runner, data.clone()), data.clone()) + } + ).collect::>(); + + for (handle, mut data) in handles { + let result = handle.join(); + data.sort(); + + assert_eq!(result, data); + } + }).expect("Failed to run with externalities"); + } +}