diff --git a/Cargo.lock b/Cargo.lock index 44a47ad9cc..1fdde05966 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -95,8 +95,9 @@ dependencies = [ [[package]] name = "air-interpreter-interface" version = "0.19.0" +source = "git+https://github.com/fluencelabs/aquavm?branch=feat/use-async-marine#d4a222f4975090dbf37162cb431b158a49c14e0f" dependencies = [ - "air-interpreter-sede 0.1.0", + "air-interpreter-sede 0.1.0 (git+https://github.com/fluencelabs/aquavm?branch=feat/use-async-marine)", "air-interpreter-value", "fluence-it-types", "marine-call-parameters 0.14.0", @@ -109,8 +110,9 @@ dependencies = [ [[package]] name = "air-interpreter-sede" version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "433b1ca3538f5f6a6083646b2aa61cd57a08c1558091f6be47b2299fc7b28261" dependencies = [ - "marine-rs-sdk", "rmp-serde", "serde", "serde_bytes", @@ -122,9 +124,9 @@ dependencies = [ [[package]] name = "air-interpreter-sede" version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "433b1ca3538f5f6a6083646b2aa61cd57a08c1558091f6be47b2299fc7b28261" +source = "git+https://github.com/fluencelabs/aquavm?branch=feat/use-async-marine#d4a222f4975090dbf37162cb431b158a49c14e0f" dependencies = [ + "marine-rs-sdk", "rmp-serde", "serde", "serde_bytes", @@ -136,6 +138,7 @@ dependencies = [ [[package]] name = "air-interpreter-value" version = "0.1.0" +source = "git+https://github.com/fluencelabs/aquavm?branch=feat/use-async-marine#d4a222f4975090dbf37162cb431b158a49c14e0f" dependencies = [ "serde", "serde_json", @@ -153,6 +156,7 @@ dependencies = [ [[package]] name = "air-utils" version = "0.3.0" +source = "git+https://github.com/fluencelabs/aquavm?branch=feat/use-async-marine#d4a222f4975090dbf37162cb431b158a49c14e0f" [[package]] name = "allocator-api2" @@ -737,6 +741,7 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "avm-data-store" version = "0.7.9" +source = "git+https://github.com/fluencelabs/aquavm?branch=feat/use-async-marine#d4a222f4975090dbf37162cb431b158a49c14e0f" dependencies = [ "avm-interface", "serde", @@ -746,9 +751,10 @@ dependencies = [ [[package]] name = "avm-interface" version = "0.32.1" +source = "git+https://github.com/fluencelabs/aquavm?branch=feat/use-async-marine#d4a222f4975090dbf37162cb431b158a49c14e0f" dependencies = [ "air-interpreter-interface", - "air-interpreter-sede 0.1.0", + "air-interpreter-sede 0.1.0 (git+https://github.com/fluencelabs/aquavm?branch=feat/use-async-marine)", "air-utils", "log", "maplit", @@ -762,9 +768,10 @@ dependencies = [ [[package]] name = "avm-server" version = "0.37.0" +source = "git+https://github.com/fluencelabs/aquavm?branch=feat/use-async-marine#d4a222f4975090dbf37162cb431b158a49c14e0f" dependencies = [ "air-interpreter-interface", - "air-interpreter-sede 0.1.0", + "air-interpreter-sede 0.1.0 (git+https://github.com/fluencelabs/aquavm?branch=feat/use-async-marine)", "air-utils", "avm-data-store", "avm-interface", @@ -6249,6 +6256,7 @@ dependencies = [ "fluence-keypair", "fluence-libp2p", "fs-utils", + "futures", "health", "humantime-serde", "json-utils", @@ -6465,6 +6473,7 @@ dependencies = [ [[package]] name = "polyplets" version = "0.7.0" +source = "git+https://github.com/fluencelabs/aquavm?branch=feat/use-async-marine#d4a222f4975090dbf37162cb431b158a49c14e0f" dependencies = [ "marine-call-parameters 0.14.0", "serde", @@ -8132,6 +8141,7 @@ name = "system-services" version = "0.1.0" dependencies = [ "aqua-ipfs-distro", + "async-trait", "decider-distro", "eyre", "fluence-app-service", diff --git a/Cargo.toml b/Cargo.toml index 8769075f7d..01d5d0d6b6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -113,11 +113,11 @@ fluence-spell-distro = "=0.7.5" fluence-app-service = { version = "=0.35.2", git = "https://github.com/fluencelabs/marine", branch = "feat/async-call", features = ["wasmtime"] } marine-utils = "0.5.1" marine-it-parser = { version = "0.16.0", git = "https://github.com/fluencelabs/marine", branch = "feat/async-call" } -marine-module-info-parser = { version = "0.15.0", git = "https://github.com/fluencelabs/marine", branch = "feat/async-call" } -marine-wasmtime-backend = { version = "0.6.0", git = "https://github.com/fluencelabs/marine", branch = "feat/async-call" } +marine-module-info-parser = { version = "0.15.0", git = "https://github.com/fluencelabs/marine", branch = "feat/async-call" } +marine-wasmtime-backend = { version = "0.6.0", git = "https://github.com/fluencelabs/marine", branch = "feat/async-call" } # avm -avm-server = { version = "=0.37.0", path = "../aquavm/avm/server" } +avm-server = { git = "https://github.com/fluencelabs/aquavm", version = "=0.37.0", branch = "feat/use-async-marine" } air-interpreter-wasm = "=0.62.0" # libp2p diff --git a/aquamarine/src/aqua_runtime.rs b/aquamarine/src/aqua_runtime.rs index f489d2c24a..701d6bbefb 100644 --- a/aquamarine/src/aqua_runtime.rs +++ b/aquamarine/src/aqua_runtime.rs @@ -21,9 +21,9 @@ use avm_server::avm_runner::{AVMRunner, RawAVMOutcome}; use avm_server::{ AVMMemoryStats, AVMRuntimeLimits, CallRequests, CallResults, ParticleParameters, RunnerError, }; -use marine_wasmtime_backend::{WasmtimeConfig, WasmtimeWasmBackend}; use fluence_keypair::KeyPair; use libp2p::PeerId; +use marine_wasmtime_backend::{WasmtimeConfig, WasmtimeWasmBackend}; use tracing::Level; use crate::config::VmConfig; @@ -72,22 +72,23 @@ impl AquaRuntime for AVMRunner { .epoch_interruption(true) .async_wasm_stack(2 * 1024 * 1024) .max_wasm_stack(2 * 1024 * 1024); - let backend = WasmtimeWasmBackend::new(wasmtime_config) - .map_err(|e| Self::Error::MarineError( - fluence_app_service::MarineError::EngineError(e.into())))?; + let backend = WasmtimeWasmBackend::new(wasmtime_config).map_err(|e| { + Self::Error::MarineError(fluence_app_service::MarineError::EngineError(e.into())) + })?; let avm_runtime_limits = AVMRuntimeLimits::new( config.air_size_limit, config.particle_size_limit, config.call_result_size_limit, config.hard_limit_enabled, ); - let vm: AVMRunner = tokio::runtime::Handle::current().block_on(AVMRunner::new( - config.air_interpreter, - config.max_heap_size, - avm_runtime_limits, - i32::MAX, - backend, - ))?; + let vm: AVMRunner = + tokio::runtime::Handle::current().block_on(AVMRunner::new( + config.air_interpreter, + config.max_heap_size, + avm_runtime_limits, + i32::MAX, + backend, + ))?; waker.wake(); Ok(vm) } diff --git a/crates/local-vm/src/local_vm.rs b/crates/local-vm/src/local_vm.rs index 111d61a645..3294b60420 100644 --- a/crates/local-vm/src/local_vm.rs +++ b/crates/local-vm/src/local_vm.rs @@ -22,9 +22,9 @@ use std::{collections::HashMap, time::Duration}; use avm_server::avm_runner::{AVMRunner, RawAVMOutcome}; use avm_server::{CallResults, CallServiceResult}; -use marine_wasmtime_backend::{WasmtimeConfig, WasmtimeWasmBackend}; use fstrings::f; use libp2p::PeerId; +use marine_wasmtime_backend::{WasmtimeConfig, WasmtimeWasmBackend}; use serde_json::{json, Value as JValue}; use air_interpreter_fs::{air_interpreter_path, write_default_air_interpreter}; @@ -180,24 +180,29 @@ pub fn make_wasm_backend() -> WasmtimeWasmBackend { .epoch_interruption(true) .async_wasm_stack(2 * 1024 * 1024) .max_wasm_stack(2 * 1024 * 1024); - WasmtimeWasmBackend::new(wasmtime_config) - .expect("Cannot create WasmtimeWasmBackend") + WasmtimeWasmBackend::new(wasmtime_config).expect("Cannot create WasmtimeWasmBackend") } pub async fn make_vm(tmp_dir_path: &Path) -> AVMRunner { let interpreter = air_interpreter_path(tmp_dir_path); write_default_air_interpreter(&interpreter).expect("write air interpreter"); - AVMRunner::new(interpreter, None, <_>::default(), i32::MAX, make_wasm_backend()) - .await - .map_err(|err| { - log::error!("\n\n\nFailed to create local AVM: {:#?}\n\n\n", err); + AVMRunner::new( + interpreter, + None, + <_>::default(), + i32::MAX, + make_wasm_backend(), + ) + .await + .map_err(|err| { + log::error!("\n\n\nFailed to create local AVM: {:#?}\n\n\n", err); - println!("\n\n\nFailed to create local AVM: {err:#?}\n\n\n"); + println!("\n\n\nFailed to create local AVM: {err:#?}\n\n\n"); - err - }) - .expect("vm should be created") + err + }) + .expect("vm should be created") } pub fn wrap_script( diff --git a/crates/nox-tests/tests/services.rs b/crates/nox-tests/tests/services.rs index 9a99fdf84e..b0cc8239c6 100644 --- a/crates/nox-tests/tests/services.rs +++ b/crates/nox-tests/tests/services.rs @@ -59,7 +59,7 @@ async fn test_system_service_override() { config, }; let name = service_name.clone(); - let init: system_services::InitService = Box::new(move |call, status| { + let init: dyn system_services::InitService = Box::new(move |call, status| { let name = name.clone(); let service_status = status .services diff --git a/crates/spell-service-api/src/lib.rs b/crates/spell-service-api/src/lib.rs index 2f2a088cd7..05ebe90fe1 100644 --- a/crates/spell-service-api/src/lib.rs +++ b/crates/spell-service-api/src/lib.rs @@ -111,24 +111,24 @@ impl SpellServiceApi { Self { services } } - pub fn set_script(&self, params: CallParams, script: String) -> Result<(), CallError> { + pub async fn set_script(&self, params: CallParams, script: String) -> Result<(), CallError> { let function = Function { name: "set_script", args: vec![json!(script)], }; - let _ = self.call::(params, function)?; + let _ = self.call::(params, function).await?; Ok(()) } - pub fn get_script(&self, params: CallParams) -> Result { + pub async fn get_script(&self, params: CallParams) -> Result { let function = Function { name: "get_script", args: vec![], }; - let script_value = self.call::(params, function)?; + let script_value = self.call::(params, function).await?; Ok(script_value.value) } - pub fn set_trigger_config( + pub async fn set_trigger_config( &self, params: CallParams, config: TriggerConfig, @@ -137,39 +137,43 @@ impl SpellServiceApi { name: "set_trigger_config", args: vec![json!(config)], }; - let _ = self.call::(params, function)?; + let _ = self.call::(params, function).await?; Ok(()) } - pub fn get_trigger_config(&self, params: CallParams) -> Result { + pub async fn get_trigger_config(&self, params: CallParams) -> Result { let function = Function { name: "get_trigger_config", args: vec![], }; - let trigger_config_value = self.call::(params, function)?; + let trigger_config_value = self.call::(params, function).await?; Ok(trigger_config_value.config) } // TODO: use `Map` for init_data instead of `Value` - pub fn update_kv(&self, params: CallParams, kv_data: Value) -> Result<(), CallError> { + pub async fn update_kv(&self, params: CallParams, kv_data: Value) -> Result<(), CallError> { let function = Function { name: "set_json_fields", args: vec![json!(kv_data.to_string())], }; - let _ = self.call::(params, function)?; + let _ = self.call::(params, function).await?; Ok(()) } - pub fn get_string(&self, params: CallParams, key: String) -> Result, CallError> { + pub async fn get_string( + &self, + params: CallParams, + key: String, + ) -> Result, CallError> { let function = Function { name: "get_string", args: vec![json!(key)], }; - let result = self.call::(params, function)?; + let result = self.call::(params, function).await?; Ok((!result.absent).then_some(result.value)) } - pub fn set_string( + pub async fn set_string( &self, params: CallParams, key: String, @@ -179,60 +183,68 @@ impl SpellServiceApi { name: "set_string", args: vec![json!(key), json!(value)], }; - let _ = self.call::(params, function)?; + let _ = self.call::(params, function).await?; Ok(()) } /// Load the counter (how many times the spell was run) - pub fn get_counter(&self, params: CallParams) -> Result, CallError> { + pub async fn get_counter(&self, params: CallParams) -> Result, CallError> { let function = Function { name: "get_u32", args: vec![json!("hw_counter")], }; - let result = self.call::(params, function)?; + let result = self.call::(params, function).await?; Ok((!result.absent).then_some(result.value)) } /// Update the counter (how many times the spell was run) /// TODO: permission check here or not? - pub fn set_counter(&self, params: CallParams, counter: u32) -> Result<(), CallError> { + pub async fn set_counter(&self, params: CallParams, counter: u32) -> Result<(), CallError> { let function = Function { name: "set_u32", args: vec![json!("hw_counter"), json!(counter)], }; - let _ = self.call::(params, function)?; + let _ = self.call::(params, function).await?; Ok(()) } - pub fn set_trigger_event(&self, params: CallParams, event: String) -> Result<(), CallError> { + pub async fn set_trigger_event( + &self, + params: CallParams, + event: String, + ) -> Result<(), CallError> { self.set_string(params, "hw_trigger".to_string(), event) + .await } - pub fn store_error(&self, params: CallParams, args: Vec) -> Result<(), CallError> { + pub async fn store_error(&self, params: CallParams, args: Vec) -> Result<(), CallError> { let function = Function { name: "store_error", args, }; - let _ = self.call::(params, function)?; + let _ = self.call::(params, function).await?; Ok(()) } - fn call(&self, params: CallParams, function: Function) -> Result + async fn call(&self, params: CallParams, function: Function) -> Result where T: DeserializeOwned + SpellValueT, { use CallError::*; let spell_id = params.spell_id; - let result = self.services.call_function( - params.peer_scope, - &spell_id, - function.name, - function.args, - params.particle_id, - params.init_peer_id, - params.ttl, - ); + let result = self + .services + .call_function( + params.peer_scope, + &spell_id, + function.name, + function.args, + params.particle_id, + params.init_peer_id, + params.ttl, + ) + .await; match result { FunctionOutcome::NotDefined { .. } => Err(ServiceNotFound { spell_id, diff --git a/crates/system-services/Cargo.toml b/crates/system-services/Cargo.toml index c7f11b268e..78277d7549 100644 --- a/crates/system-services/Cargo.toml +++ b/crates/system-services/Cargo.toml @@ -19,6 +19,7 @@ libp2p = { workspace = true } log = { workspace = true } serde_json = { workspace = true } serde = { workspace = true } +async-trait = { workspace = true } fluence-spell-dtos = { workspace = true } tracing = { workspace = true } diff --git a/crates/system-services/src/deployer.rs b/crates/system-services/src/deployer.rs index a159b538b0..8d12979f12 100644 --- a/crates/system-services/src/deployer.rs +++ b/crates/system-services/src/deployer.rs @@ -1,6 +1,7 @@ use crate::distro::*; -use crate::CallService; -use crate::{DeploymentStatus, PackageDistro, ServiceDistro, ServiceStatus, SpellDistro}; +use crate::{CallService, FunctionName, ServiceName}; +use crate::{Deployment, PackageDistro, ServiceDistro, ServiceStatus, SpellDistro}; +use async_trait::async_trait; use eyre::eyre; use futures::{FutureExt, StreamExt, TryStreamExt}; use libp2p::PeerId; @@ -88,16 +89,17 @@ impl Deployer { let services = self.services.clone(); let root_worker_id = self.host_peer_id; - let call: CallService = Box::new(move |srv, fnc, args| { - call_service(&services, PeerScope::Host, root_worker_id, &srv, &fnc, args) - }); - + let call: &dyn CallService = &CallServiceParams { + services: &services, + peer_scope: PeerScope::Host, + init_peer_id: root_worker_id, + }; let parallelism = available_parallelism() .map(|x| x.get()) .unwrap_or(DEFAULT_PARALLELISM); futures::stream::iter(self.system_service_distros.distros.values()) - .map(|distro| async { self.deploy_package(&call, distro.clone()).await }.boxed()) + .map(|distro| async { self.deploy_package(call, distro.clone()).await }.boxed()) .boxed() .buffer_unordered(parallelism) .try_collect::>() @@ -106,7 +108,11 @@ impl Deployer { Ok(()) } - async fn deploy_package(&self, call: &CallService, package: PackageDistro) -> eyre::Result<()> { + async fn deploy_package( + &self, + call_service: &dyn CallService, + package: PackageDistro, + ) -> eyre::Result<()> { let mut services = HashMap::new(); for service_distro in package.services { let name = service_distro.name.clone(); @@ -120,10 +126,10 @@ impl Deployer { let result = self.deploy_system_spell(spell_distro).await?; spells.insert(name, result); } - let status = DeploymentStatus { services, spells }; + let status = Deployment { services, spells }; if let Some(init) = package.init { - init(call, status)?; + init.init(call_service, status).await?; } Ok(()) @@ -131,7 +137,7 @@ impl Deployer { async fn deploy_system_spell(&self, spell_distro: SpellDistro) -> eyre::Result { let spell_name = spell_distro.name.clone(); - match self.find_same_spell(&spell_distro) { + match self.find_same_spell(&spell_distro).await { Some(spell_id) => { tracing::debug!( spell_name, @@ -193,12 +199,16 @@ impl Deployer { ); // update trigger config let config = spell_distro.trigger_config.clone(); - self.spells_api.set_trigger_config(params.clone(), config)?; + self.spells_api + .set_trigger_config(params.clone(), config) + .await?; // update spell script let air = spell_distro.air.to_string(); - self.spells_api.set_script(params.clone(), air)?; + self.spells_api.set_script(params.clone(), air).await?; // update init_data without affecting other keys - self.spells_api.update_kv(params, json!(spell_distro.kv))?; + self.spells_api + .update_kv(params, json!(spell_distro.kv)) + .await?; // resubscribe spell if let Some(trigger_config) = trigger_config { @@ -258,10 +268,11 @@ impl Deployer { } // Two spells are the same if they have the same alias - fn find_same_spell(&self, new_spell: &SpellDistro) -> Option { - let existing_spell = - self.services - .get_service_info(PeerScope::Host, new_spell.name.to_string(), ""); + async fn find_same_spell(&self, new_spell: &SpellDistro) -> Option { + let existing_spell = self + .services + .get_service_info(PeerScope::Host, new_spell.name.to_string(), "") + .await; match existing_spell { Err(ServiceError::NoSuchService(_, _)) => { log::debug!("no existing spell found for {}", new_spell.name); @@ -293,7 +304,10 @@ impl Deployer { let service_name = service_distro.name.clone(); let blueprint_id = self.add_modules(service_distro)?; - match self.find_same_service(service_name.to_string(), &blueprint_id) { + match self + .find_same_service(service_name.to_string(), &blueprint_id) + .await + { ServiceUpdateStatus::NeedUpdate(service_id) => { tracing::debug!(service_name, service_id, "found existing service that needs to be updated; will remove the old service and deploy a new one"); let result = self @@ -345,12 +359,17 @@ impl Deployer { Ok(ServiceStatus::Created(service_id)) } - fn find_same_service(&self, service_name: String, blueprint_id: &str) -> ServiceUpdateStatus { + async fn find_same_service( + &self, + service_name: String, + blueprint_id: &str, + ) -> ServiceUpdateStatus { // Check that the service exist and has the same blueprint. // In this case, we don't create a new one. - let existing_service = - self.services - .get_service_info(PeerScope::Host, service_name.to_string(), ""); + let existing_service = self + .services + .get_service_info(PeerScope::Host, service_name.to_string(), "") + .await; if let Ok(service) = existing_service { if service.service_type == ServiceType::Spell { log::warn!( @@ -402,55 +421,66 @@ impl Deployer { } } -fn call_service( - services: &ParticleAppServices, +struct CallServiceParams<'a> { + services: &'a ParticleAppServices, peer_scope: PeerScope, init_peer_id: PeerId, - service_id: &str, - function_name: &str, - args: Vec, -) -> eyre::Result<()> { - let result = services.call_function( - peer_scope, - service_id, - function_name, - args, - None, - init_peer_id, - DEPLOYER_TTL, - ); - // similar to process_func_outcome in sorcerer/src/utils.rs, but that func is - // to specialized to spell specific - match result { - FunctionOutcome::Ok(result) => { - let call_result: Option> = try { - let result = result.as_object()?; - let is_success = result["success"].as_bool()?; - if !is_success { - if let Some(error) = result["error"].as_str() { - Err(eyre!( - "Call {service_id}.{function_name} returned error: {}", - error - )) +} + +#[async_trait] +impl<'a> CallService for CallServiceParams<'a> { + async fn call( + &self, + service_name: ServiceName, + function_name: FunctionName, + args: Vec, + ) -> eyre::Result<()> { + let service_id = service_name; + let result = self + .services + .call_function( + self.peer_scope, + service_id.as_str(), + function_name.as_str(), + args, + None, + self.init_peer_id, + DEPLOYER_TTL, + ) + .await; + // similar to process_func_outcome in sorcerer/src/utils.rs, but that func is + // to specialized to spell specific + match result { + FunctionOutcome::Ok(result) => { + let call_result: Option> = try { + let result = result.as_object()?; + let is_success = result["success"].as_bool()?; + if !is_success { + if let Some(error) = result["error"].as_str() { + Err(eyre!( + "Call {service_id}.{function_name} returned error: {}", + error + )) + } else { + Err(eyre!("Call {service_id}.{function_name} returned error")) + } } else { - Err(eyre!("Call {service_id}.{function_name} returned error")) + Ok(()) } - } else { - Ok(()) - } - }; - call_result.unwrap_or_else(|| { - Err(eyre!( - "Call {service_id}.{function_name} return invalid result: {result}" - )) - }) - } - FunctionOutcome::NotDefined { .. } => { - Err(eyre!("Service {service_id} ({function_name}) not found")) + }; + call_result.unwrap_or_else(|| { + Err(eyre!( + "Call {service_id}.{function_name} return invalid result: {result}" + )) + }) + } + FunctionOutcome::NotDefined { .. } => { + Err(eyre!("Service {service_id} ({function_name}) not found")) + } + FunctionOutcome::Empty => Err(eyre!( + "Call {service_id}.{function_name} didn't return any result" + )), + FunctionOutcome::Err(err) => Err(eyre!(err)), } - FunctionOutcome::Empty => Err(eyre!( - "Call {service_id}.{function_name} didn't return any result" - )), - FunctionOutcome::Err(err) => Err(eyre!(err)), } } diff --git a/crates/system-services/src/distro.rs b/crates/system-services/src/distro.rs index 8f545b3bdf..712763423b 100644 --- a/crates/system-services/src/distro.rs +++ b/crates/system-services/src/distro.rs @@ -1,3 +1,4 @@ +use async_trait::async_trait; use fluence_app_service::TomlMarineConfig; use fluence_spell_dtos::trigger_config::TriggerConfig; use serde_json::json; @@ -7,10 +8,11 @@ use server_config::system_services_config::{ }; use std::collections::HashMap; use std::sync::Arc; +use trust_graph_distro::Certs; use crate::{ - apply_binary_path_override, CallService, DeploymentStatus, InitService, PackageDistro, - ServiceDistro, ServiceStatus, SpellDistro, + apply_binary_path_override, CallService, Deployment, InitService, PackageDistro, ServiceDistro, + ServiceStatus, SpellDistro, }; #[derive(Debug, Clone)] @@ -41,14 +43,14 @@ impl SystemServiceDistros { let distros: HashMap = config .enable .iter() - .map(|key| { + .map(move |key| { let distro = match key { AquaIpfs => default_aqua_ipfs_distro(&config.aqua_ipfs), TrustGraph => default_trust_graph_distro(), Registry => default_registry_distro(&config.registry), Decider => default_decider_distro(&config.decider, &config.connector), }; - distro.map(|d| (d.name.clone(), d)) + distro.map(move |d| (d.name.clone(), d)) }) .collect::>()?; @@ -90,32 +92,52 @@ impl SystemServiceDistros { } } -pub fn default_trust_graph_distro() -> eyre::Result { - use trust_graph_distro::*; +struct TrustGraphInit<'a> { + name: String, + certs: &'a Certs, +} - fn mk_init(name: String, certs: &'static trust_graph_distro::Certs) -> InitService { - let init = move |call: &CallService, deployment: DeploymentStatus| { - if let Some(ServiceStatus::Created(id)) = deployment.services.get(&name) { - call( - name.clone(), +#[async_trait] +impl<'a> InitService for TrustGraphInit<'a> { + async fn init( + &self, + call_service: &dyn CallService, + deployment: Deployment, + ) -> eyre::Result<()> { + if let Some(ServiceStatus::Created(id)) = deployment.services.get(&self.name) { + call_service + .call( + self.name.clone(), "set_root".to_string(), - vec![json!(certs.root_node), json!(certs.max_chain_length)], - )?; + vec![ + json!(self.certs.root_node), + json!(self.certs.max_chain_length), + ], + ) + .await?; - let timestamp = now_millis::now_sec(); - for cert_chain in &certs.certs { - call( - name.clone(), + let timestamp = now_millis::now_sec(); + for cert_chain in &self.certs.certs { + call_service + .call( + self.name.clone(), "insert_cert".to_string(), vec![json!(cert_chain), json!(timestamp)], - )?; - } - tracing::info!(service_id = id, service_alias = name, "initialized service"); + ) + .await?; } - Ok(()) as eyre::Result<()> - }; - Box::new(init) + tracing::info!( + service_id = id, + service_alias = self.name, + "initialized service" + ); + } + Ok(()) as eyre::Result<()> } +} + +pub fn default_trust_graph_distro<'a>() -> eyre::Result { + use trust_graph_distro::*; let config: TomlMarineConfig = toml::from_slice(CONFIG)?; let service_distro = ServiceDistro { @@ -123,50 +145,69 @@ pub fn default_trust_graph_distro() -> eyre::Result { config, name: TrustGraph.to_string(), }; - let certs: &'static trust_graph_distro::Certs = &trust_graph_distro::KRAS_CERTS; - let init = mk_init(TrustGraph.to_string(), certs); + let certs: &'static Certs = &trust_graph_distro::KRAS_CERTS; + let init = TrustGraphInit { + name: TrustGraph.to_string(), + certs, + }; let package = PackageDistro { name: TrustGraph.to_string(), version: VERSION, services: vec![service_distro], spells: vec![], - init: Some(Arc::new(init)), + init: Some(Arc::new(Box::new(init))), }; Ok(package) } -pub fn default_aqua_ipfs_distro(config: &AquaIpfsConfig) -> eyre::Result { - use aqua_ipfs_distro::*; +struct AquaIpfsConfigInit { + local_api_multiaddr: String, + external_api_multiaddr: String, + name: String, +} - fn mk_init(name: String, config: &AquaIpfsConfig) -> InitService { - let local_api_multiaddr = config.local_api_multiaddr.clone(); - let external_api_multiaddr = config.external_api_multiaddr.clone(); - let init = move |call: &CallService, deployment: DeploymentStatus| { - if let Some(ServiceStatus::Created(id) | ServiceStatus::Existing(id)) = - deployment.services.get(&name) - { - let set_local_result = call( - name.clone(), +#[async_trait] +impl InitService for AquaIpfsConfigInit { + async fn init( + &self, + call_service: &dyn CallService, + deployment: Deployment, + ) -> eyre::Result<()> { + if let Some(ServiceStatus::Created(id) | ServiceStatus::Existing(id)) = + deployment.services.get(&self.name) + { + let set_local_result = call_service + .call( + self.name.clone(), "set_local_api_multiaddr".to_string(), - vec![json!(local_api_multiaddr)], - ); + vec![json!(self.local_api_multiaddr)], + ) + .await; - let set_external_result = call( - name.clone(), + let set_external_result = call_service + .call( + self.name.clone(), "set_external_api_multiaddr".to_string(), - vec![json!(external_api_multiaddr)], - ); + vec![json!(self.external_api_multiaddr)], + ) + .await; - // try to set local and external api multiaddrs, and only then produce an error - set_local_result?; - set_external_result?; + // try to set local and external api multiaddrs, and only then produce an error + set_local_result?; + set_external_result?; - tracing::info!(service_id = id, service_alias = name, "initialized service"); - } - Ok(()) - }; - Box::new(init) + tracing::info!( + service_id = id, + service_alias = self.name, + "initialized service" + ); + } + Ok(()) } +} + +pub fn default_aqua_ipfs_distro(config: &AquaIpfsConfig) -> eyre::Result { + use aqua_ipfs_distro::*; let mut marine_config: TomlMarineConfig = toml::from_slice(CONFIG)?; apply_binary_path_override( @@ -182,13 +223,21 @@ pub fn default_aqua_ipfs_distro(config: &AquaIpfsConfig) -> eyre::Result eyre::Result( decider_config: &DeciderConfig, connector_config: &ConnectorConfig, ) -> eyre::Result { diff --git a/crates/system-services/src/lib.rs b/crates/system-services/src/lib.rs index d5b545131b..b23a20c01d 100644 --- a/crates/system-services/src/lib.rs +++ b/crates/system-services/src/lib.rs @@ -4,6 +4,11 @@ mod deployer; mod distro; +use async_trait::async_trait; +use std::collections::HashMap; +use std::fmt; +use std::sync::Arc; + pub use deployer::Deployer; pub use distro::SystemServiceDistros; pub use distro::Versions; @@ -11,13 +16,19 @@ pub use distro::Versions; use fluence_app_service::TomlMarineConfig; use fluence_spell_dtos::trigger_config::TriggerConfig; use serde_json::Value; -use std::collections::HashMap; -use std::fmt; -use std::sync::Arc; type ServiceName = String; type FunctionName = String; +/// Status of package deployment for each services and spells of the package +#[derive(Clone, Debug)] +pub struct Deployment { + /// Statuses of spells deployment + pub spells: HashMap, + /// Statuses of services deployment + pub services: HashMap, +} + /// Call service functions. Accepts /// - service name /// - function name @@ -27,14 +38,27 @@ type FunctionName = String; /// The functions called via this callback must return a result with execution status in field `status: bool` /// and error message in the field `error: string`. /// Otherwise, the output will be consider invalid. -pub type CallService = - Box) -> eyre::Result<()> + Send + Sync>; +#[async_trait] +pub trait CallService: Send + Sync { + async fn call( + &self, + service_name: ServiceName, + function_name: FunctionName, + args: Vec, + ) -> eyre::Result<()>; +} /// Initialization function to initialize services /// - accepts `DeploymentStatus` of services and spells to be able to update or initialize the services /// - accepts `CallService` to be able to call installed services for initialization. -pub type InitService = - Box eyre::Result<()> + Send + Sync>; +#[async_trait] +pub trait InitService: Send + Sync { + async fn init( + &self, + call_service: &dyn CallService, + deployment: Deployment, + ) -> eyre::Result<()>; +} /// Package distribution description /// Contains enough information about all services and spells used by the package for installation @@ -52,7 +76,7 @@ pub struct PackageDistro { /// List of spells needed by the package pub spells: Vec, /// Optionally, initialization function for the services. - pub init: Option>, + pub init: Option>>, } impl fmt::Debug for PackageDistro { @@ -125,15 +149,6 @@ pub enum ServiceStatus { Existing(String), } -/// Status of package deployment for each services and spells of the package -#[derive(Clone, Debug)] -pub struct DeploymentStatus { - /// Statuses of spells deployment - pub spells: HashMap, - /// Statuses of services deployment - pub services: HashMap, -} - /// Override a binary path to a binary for a module in the service configuration fn apply_binary_path_override( config: &mut TomlMarineConfig, diff --git a/nox/src/main.rs b/nox/src/main.rs index a581190cbb..3d7a67499b 100644 --- a/nox/src/main.rs +++ b/nox/src/main.rs @@ -36,7 +36,7 @@ use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use air_interpreter_fs::write_default_air_interpreter; -use aquamarine::{DataStoreConfig, VmConfig, AVMRunner}; +use aquamarine::{AVMRunner, DataStoreConfig, VmConfig}; use config_utils::to_peer_id; use core_manager::{CoreManager, CoreManagerFunctions, DevCoreManager, StrictCoreManager}; use fs_utils::to_abs_path; diff --git a/nox/src/node.rs b/nox/src/node.rs index 8afb4073bf..a8a09e558f 100644 --- a/nox/src/node.rs +++ b/nox/src/node.rs @@ -360,7 +360,8 @@ impl Node { scopes.clone(), spell_service_api.clone(), spell_metrics, - ); + ) + .await; let allowed_binaries = config .allowed_effectors diff --git a/particle-builtins/src/builtins.rs b/particle-builtins/src/builtins.rs index 51940fc40c..5226ee4d99 100644 --- a/particle-builtins/src/builtins.rs +++ b/particle-builtins/src/builtins.rs @@ -152,7 +152,7 @@ where let end = start.elapsed().as_secs(); match result { - FunctionOutcome::NotDefined { args, params } => self.call_service(args, params), + FunctionOutcome::NotDefined { args, params } => self.call_service(args, params).await, result => { if let Some(metrics) = self.services.metrics.as_ref() { metrics.observe_builtins(result.not_err(), end as f64); @@ -204,19 +204,19 @@ where ("kad", "neigh_with_addrs") => wrap(self.neighborhood_with_addresses(args).await), ("kad", "merge") => wrap(self.kad_merge(args.function_args)), - ("srv", "list") => ok(self.list_services(particle)), + ("srv", "list") => ok(self.list_services(particle).await), ("srv", "create") => wrap(self.create_service(args, particle).await), - ("srv", "get_interface") => wrap(self.get_interface(args, particle)), - ("srv", "resolve_alias") => wrap(self.resolve_alias(args, particle)), - ("srv", "resolve_alias_opt") => wrap(self.resolve_alias_opt(args, particle)), + ("srv", "get_interface") => wrap(self.get_interface(args, particle).await), + ("srv", "resolve_alias") => wrap(self.resolve_alias(args, particle).await), + ("srv", "resolve_alias_opt") => wrap(self.resolve_alias_opt(args, particle).await), ("srv", "add_alias") => wrap_unit(self.add_alias(args, particle).await), ("srv", "remove") => wrap_unit(self.remove_service(args, particle).await), - ("srv", "info") => wrap(self.get_service_info(args, particle)), + ("srv", "info") => wrap(self.get_service_info(args, particle).await), - ("dist", "add_module_from_vault") => wrap(self.add_module_from_vault(args, particle)), - ("dist", "add_module") => wrap(self.add_module(args, particle)), - ("dist", "add_module_bytes_from_vault") => wrap(self.add_module_bytes_from_vault(args, particle)), - ("dist", "add_blueprint") => wrap(self.add_blueprint(args, particle)), + ("dist", "add_module_from_vault") => wrap(self.add_module_from_vault(args, particle).await), + ("dist", "add_module") => wrap(self.add_module(args, particle).await), + ("dist", "add_module_bytes_from_vault") => wrap(self.add_module_bytes_from_vault(args, particle).await), + ("dist", "add_blueprint") => wrap(self.add_blueprint(args, particle).await), ("dist", "make_module_config") => wrap(make_module_config(args)), ("dist", "load_module_config") => wrap(self.load_module_config_from_vault(args, particle)), ("dist", "default_module_config") => wrap(self.default_module_config(args)), @@ -241,8 +241,8 @@ where ("debug", "stringify") => self.stringify(args.function_args), - ("stat", "service_memory") => wrap(self.service_mem_stats(args, particle)), - ("stat", "service_stat") => wrap(self.service_stat(args, particle)), + ("stat", "service_memory") => wrap(self.service_mem_stats(args, particle).await), + ("stat", "service_stat") => wrap(self.service_stat(args, particle).await), ("math", "add") => binary(args, |x: i64, y: i64| -> R { math::add(x, y) }), ("math", "sub") => binary(args, |x: i64, y: i64| -> R { math::sub(x, y) }), @@ -284,7 +284,7 @@ where ("subnet", "resolve") => wrap(self.subnet_resolve(args).await), ("run-console", "print") => { - self.guard_protected(&particle)?; + self.guard_protected(&particle).await?; let function_args = args.function_args.iter(); let decider = function_args.filter_map(JValue::as_str).any(|s| s.contains("decider")); @@ -304,7 +304,7 @@ where // a worker spell. Otherwise we allow the call to go and find an aqua-ipfs service // since it can be a user-defined service which isn't the same as system aqua-ipfs. if matches!(particle.peer_scope, PeerScope::Host) { - self.guard_protected(&particle)?; + self.guard_protected(&particle).await?; } FunctionOutcome::NotDefined { args, params: particle } } @@ -378,7 +378,7 @@ where let peer_id = PeerId::from_str(peer_id.as_str())?; let addrs: Vec = Args::next_opt("addresses", &mut args)?.unwrap_or_default(); - self.guard_protected(¶ms)?; + self.guard_protected(¶ms).await?; let contact = Contact::new(peer_id, addrs); @@ -629,23 +629,27 @@ where Ok(JValue::Array(slice)) } - fn add_module(&self, args: Args, params: ParticleParams) -> Result { + async fn add_module(&self, args: Args, params: ParticleParams) -> Result { let mut args = args.function_args.into_iter(); let module_bytes: String = Args::next("module_bytes", &mut args)?; let config = Args::next("config", &mut args)?; - self.guard_protected(¶ms)?; + self.guard_protected(¶ms).await?; let hash = self.modules.add_module_base64(module_bytes, config)?; Ok(json!(hash)) } - fn add_module_from_vault(&self, args: Args, params: ParticleParams) -> Result { + async fn add_module_from_vault( + &self, + args: Args, + params: ParticleParams, + ) -> Result { let mut args = args.function_args.into_iter(); let module_path: String = Args::next("module_path", &mut args)?; let config: TomlMarineNamedModuleConfig = Args::next("config", &mut args)?; - self.guard_protected(¶ms)?; + self.guard_protected(¶ms).await?; let module_hash = self.modules.add_module_from_vault( &self.services.vault, @@ -658,7 +662,7 @@ where Ok(json!(module_hash)) } - fn add_module_bytes_from_vault( + async fn add_module_bytes_from_vault( &self, args: Args, params: ParticleParams, @@ -667,7 +671,7 @@ where let module_name: String = Args::next("module_name", &mut args)?; let module_path: String = Args::next("module_path", &mut args)?; - self.guard_protected(¶ms)?; + self.guard_protected(¶ms).await?; let module_hash = self.modules.add_module_from_vault( &self.services.vault, @@ -680,11 +684,11 @@ where Ok(json!(module_hash)) } - fn add_blueprint(&self, args: Args, params: ParticleParams) -> Result { + async fn add_blueprint(&self, args: Args, params: ParticleParams) -> Result { let mut args = args.function_args.into_iter(); let blueprint: String = Args::next("blueprint", &mut args)?; - self.guard_protected(¶ms)?; + self.guard_protected(¶ms).await?; let blueprint = AddBlueprint::decode(blueprint.as_bytes()).map_err(|err| { JError::new(format!("Error deserializing blueprint from IPLD: {err}")) @@ -800,7 +804,7 @@ where let mut args = args.function_args.into_iter(); let blueprint_id: String = Args::next("blueprint_id", &mut args)?; - self.guard_protected(¶ms)?; + self.guard_protected(¶ms).await?; let service_id = self .services @@ -819,7 +823,7 @@ where let mut args = args.function_args.into_iter(); let service_id_or_alias: String = Args::next("service_id_or_alias", &mut args)?; - self.guard_protected(¶ms)?; + self.guard_protected(¶ms).await?; self.services .remove_service( @@ -834,26 +838,30 @@ where Ok(()) } - fn list_services(&self, params: ParticleParams) -> JValue { + async fn list_services(&self, params: ParticleParams) -> JValue { Array( self.services .list_services(params.peer_scope) + .await .iter() .map(|info| json!(Service::from(info, self.scopes.clone()))) .collect(), ) } - fn call_service(&self, function_args: Args, particle: ParticleParams) -> FunctionOutcome { - self.services.call_service(function_args, particle, true) + async fn call_service(&self, function_args: Args, particle: ParticleParams) -> FunctionOutcome { + self.services + .call_service(function_args, particle, true) + .await } - fn get_interface(&self, args: Args, params: ParticleParams) -> Result { + async fn get_interface(&self, args: Args, params: ParticleParams) -> Result { let mut args = args.function_args.into_iter(); let service_id: String = Args::next("service_id", &mut args)?; Ok(self .services - .get_interface(params.peer_scope, service_id, ¶ms.id)?) + .get_interface(params.peer_scope, service_id, ¶ms.id) + .await?) } async fn add_alias(&self, args: Args, params: ParticleParams) -> Result<(), JError> { @@ -862,7 +870,7 @@ where let alias: String = Args::next("alias", &mut args)?; let service_id: String = Args::next("service_id", &mut args)?; - self.guard_protected(¶ms)?; + self.guard_protected(¶ms).await?; self.services .add_alias( @@ -883,12 +891,13 @@ where Ok(()) } - fn resolve_alias(&self, args: Args, params: ParticleParams) -> Result { + async fn resolve_alias(&self, args: Args, params: ParticleParams) -> Result { let mut args = args.function_args.into_iter(); let alias: String = Args::next("alias", &mut args)?; - let service_id = - self.services - .resolve_alias(params.peer_scope, alias.clone(), ¶ms.id)?; + let service_id = self + .services + .resolve_alias(params.peer_scope, alias.clone(), ¶ms.id) + .await?; log::debug!( "Resolved alias {} to service {:?} {}", @@ -900,24 +909,30 @@ where Ok(JValue::String(service_id)) } - fn resolve_alias_opt(&self, args: Args, params: ParticleParams) -> Result { + async fn resolve_alias_opt( + &self, + args: Args, + params: ParticleParams, + ) -> Result { let mut args = args.function_args.into_iter(); let alias: String = Args::next("alias", &mut args)?; let service_id_opt = self .services .resolve_alias(params.peer_scope, alias, ¶ms.id) + .await .map(|id| vec![JValue::String(id)]) .unwrap_or_default(); Ok(Array(service_id_opt)) } - fn get_service_info(&self, args: Args, params: ParticleParams) -> Result { + async fn get_service_info(&self, args: Args, params: ParticleParams) -> Result { let mut args = args.function_args.into_iter(); let service_id_or_alias: String = Args::next("service_id_or_alias", &mut args)?; - let info = - self.services - .get_service_info(params.peer_scope, service_id_or_alias, ¶ms.id)?; + let info = self + .services + .get_service_info(params.peer_scope, service_id_or_alias, ¶ms.id) + .await?; Ok(json!(Service::from(&info, self.scopes.clone()))) } @@ -930,22 +945,28 @@ where self.connectivity.as_ref() } - fn service_mem_stats(&self, args: Args, params: ParticleParams) -> Result { + async fn service_mem_stats( + &self, + args: Args, + params: ParticleParams, + ) -> Result { let mut args = args.function_args.into_iter(); let service_id_or_alias: String = Args::next("service_id", &mut args)?; self.services .get_service_mem_stats(params.peer_scope, service_id_or_alias, ¶ms.id) + .await .map(Array) } - fn service_stat(&self, args: Args, params: ParticleParams) -> Result { + async fn service_stat(&self, args: Args, params: ParticleParams) -> Result { let mut args = args.function_args.into_iter(); let service_id_or_alias: String = Args::next("service_id", &mut args)?; // Resolve aliases; also checks that the requested service exists. - let service_id = - self.services - .to_service_id(params.peer_scope, service_id_or_alias, ¶ms.id)?; + let service_id = self + .services + .to_service_id(params.peer_scope, service_id_or_alias, ¶ms.id) + .await?; let metrics = self .services .metrics @@ -1090,8 +1111,8 @@ where Ok(json!(result)) } - fn guard_protected(&self, particle: &ParticleParams) -> Result<(), JError> { - if self.is_worker_spell(particle) + async fn guard_protected(&self, particle: &ParticleParams) -> Result<(), JError> { + if self.is_worker_spell(particle).await || self.scopes.is_host(particle.init_peer_id) || self.scopes.is_management(particle.init_peer_id) { @@ -1108,13 +1129,14 @@ where // 2. init_peer_id must be a local peer id for the host (either host id or a worker id) // 3. There must be a spell service with alias "worker-spell" on the init_peer_id // 4. The spell service must have the same spell_id as in the particle and be of type "spell" - fn is_worker_spell(&self, particle: &ParticleParams) -> bool { + async fn is_worker_spell(&self, particle: &ParticleParams) -> bool { let result: Option<_> = try { let local_scope = self.scopes.scope(particle.init_peer_id).ok()?; let spell_id = ParticleParams::get_spell_id(&particle.id)?; let (worker_service, _) = self .services .get_service(local_scope, "worker-spell".to_string(), &particle.id) + .await .ok()?; worker_service.service_type == ServiceType::Spell && worker_service.service_id == spell_id diff --git a/particle-services/Cargo.toml b/particle-services/Cargo.toml index c84aee7415..2c4153434b 100644 --- a/particle-services/Cargo.toml +++ b/particle-services/Cargo.toml @@ -20,6 +20,7 @@ peer-metrics = { workspace = true } uuid-utils = { workspace = true } now-millis = { workspace = true } workers = { workspace = true } +futures = { workspace = true } fluence-app-service = { workspace = true } diff --git a/particle-services/src/app_services.rs b/particle-services/src/app_services.rs index 9205ef7793..fd20fc4390 100644 --- a/particle-services/src/app_services.rs +++ b/particle-services/src/app_services.rs @@ -24,8 +24,8 @@ use fluence_app_service::{ MarineConfig, MarineError, MarineWASIConfig, ModuleDescriptor, SecurityTetraplet, ServiceInterface, WasmtimeConfig, }; +use futures::{stream, StreamExt}; use humantime_serde::re::humantime::format_duration as pretty; -use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard}; use serde::{Deserialize, Serialize}; use serde_json::{json, Value as JValue}; use tokio::runtime::Handle; @@ -85,18 +85,18 @@ pub struct ServiceInfo { #[derivative(Debug)] pub struct Service { #[derivative(Debug(format_with = "fmt_service"))] - pub service: Mutex, + pub service: tokio::sync::Mutex, pub service_id: String, pub blueprint_id: String, pub service_type: ServiceType, pub owner_id: PeerId, - pub aliases: RwLock>, + pub aliases: tokio::sync::RwLock>, pub peer_scope: PeerScope, } impl Service { pub fn new( - service: Mutex, + service: tokio::sync::Mutex, service_id: String, blueprint_id: String, service_type: ServiceType, @@ -110,36 +110,36 @@ impl Service { blueprint_id, service_type, owner_id, - aliases: RwLock::new(aliases), + aliases: tokio::sync::RwLock::new(aliases), peer_scope, } } - pub fn remove_alias(&self, alias: &str) { - let mut aliases = self.aliases.write(); + pub async fn remove_alias(&self, alias: &str) { + let mut aliases = self.aliases.write().await; if let Some(pos) = aliases.iter().position(|x| *x == alias) { aliases.remove(pos); } } - pub fn add_alias(&self, alias: String) { - self.aliases.write().push(alias); + pub async fn add_alias(&self, alias: String) { + self.aliases.write().await.push(alias); } - pub fn get_info(&self, id: &str) -> ServiceInfo { + pub async fn get_info(&self, id: &str) -> ServiceInfo { ServiceInfo { id: id.to_string(), blueprint_id: self.blueprint_id.clone(), service_type: self.service_type.clone(), owner_id: self.owner_id, - aliases: self.aliases.read().clone(), + aliases: self.aliases.read().await.clone(), peer_scope: self.peer_scope, } } } impl Deref for Service { - type Target = Mutex; + type Target = tokio::sync::Mutex; fn deref(&self) -> &Self::Target { &self.service @@ -147,7 +147,7 @@ impl Deref for Service { } fn fmt_service( - _: &Mutex, + _: &tokio::sync::Mutex, f: &mut std::fmt::Formatter<'_>, ) -> Result<(), std::fmt::Error> { f.debug_struct("Mutex").finish() @@ -164,8 +164,8 @@ pub struct VmDescriptor<'a> { #[derive(Derivative)] #[derivative(Debug, Clone, Default)] struct Services { - services: Arc>>>, - aliases: Arc>>, + services: Arc>>>, + aliases: Arc>>, } #[derive(Derivative)] @@ -177,7 +177,7 @@ pub struct ParticleAppServices { root_services: Services, #[derivative(Debug = "ignore")] root_runtime_handle: Handle, - worker_services: Arc>>, + worker_services: Arc>>, modules: ModuleRepository, #[derivative(Debug = "ignore")] workers: Arc, @@ -191,14 +191,18 @@ pub struct ParticleAppServices { app_service_epoch_ticker: EpochTicker, } -fn resolve_alias(services: &Services, alias: &String, particle_id: &str) -> Option { +async fn resolve_alias( + services: &Services, + alias: &String, + particle_id: &str, +) -> Option { if alias == "spell" || alias == "self" { if let Some(spell_id) = ParticleParams::get_spell_id(particle_id) { return Some(spell_id); } } - services.aliases.read().get(alias).cloned() + services.aliases.read().await.get(alias).cloned() } fn get_service( @@ -293,29 +297,30 @@ impl ParticleAppServices { Ok(service_id) } - pub fn service_exists(&self, peer_scope: &PeerScope, service_id: &str) -> bool { - let services = self.get_services(peer_scope); + pub async fn service_exists(&self, peer_scope: &PeerScope, service_id: &str) -> bool { + let services = self.get_services(peer_scope).await; match services { - Ok(services) => services.services.read().get(service_id).is_some(), + Ok(services) => services.services.read().await.get(service_id).is_some(), Err(_) => false, } } - pub fn get_service_info( + pub async fn get_service_info( &self, peer_scope: PeerScope, service_id_or_alias: String, particle_id: &str, ) -> Result { - let (service, service_id) = - self.get_service(peer_scope, service_id_or_alias, particle_id)?; + let (service, service_id) = self + .get_service(peer_scope, service_id_or_alias, particle_id) + .await?; - Ok(service.get_info(&service_id)) + Ok(service.get_info(&service_id).await) } pub async fn remove_services(&self, peer_scope: PeerScope) -> Result<(), ServiceError> { - let services = self.get_services(&peer_scope)?; - let service_ids: Vec = services.services.read().keys().cloned().collect(); + let services = self.get_services(&peer_scope).await?; + let service_ids: Vec = services.services.read().await.keys().cloned().collect(); for srv_id in service_ids { //TODO: can be parallelized @@ -329,10 +334,10 @@ impl ParticleAppServices { ) } } - let services = self.get_services(&peer_scope)?; + let services = self.get_services(&peer_scope).await?; - let mut aliases = services.aliases.write(); - let mut services = services.services.write(); + let mut aliases = services.aliases.write().await; + let mut services = services.services.write().await; aliases.clear(); services.clear(); @@ -350,8 +355,9 @@ impl ParticleAppServices { ) -> Result<(), ServiceError> { let removal_start_time = Instant::now(); let service_id = { - let (service, service_id) = - self.get_service(peer_scope, service_id_or_alias.to_string(), particle_id)?; + let (service, service_id) = self + .get_service(peer_scope, service_id_or_alias.to_string(), particle_id) + .await?; // tmp hack to forbid spell removal via srv.remove if service.service_type.is_spell() && !allow_remove_spell { @@ -399,15 +405,15 @@ impl ParticleAppServices { err ) } - let services = self.get_services(&peer_scope)?; - let mut aliases = services.aliases.write(); - let mut services = services.services.write(); + let services = self.get_services(&peer_scope).await?; + let mut aliases = services.aliases.write().await; + let mut services = services.services.write().await; let service = services.remove(service_id.as_str()).unwrap(); - let service_aliases = service.aliases.read(); + let service_aliases = service.aliases.read().await; for alias in service_aliases.iter() { aliases.remove(alias.as_str()); } - let service_type = self.get_service_type(&service, &service.peer_scope); + let service_type = self.get_service_type(&service, &service.peer_scope).await; let removal_end_time = removal_start_time.elapsed().as_secs(); if let Some(metrics) = self.metrics.as_ref() { @@ -417,7 +423,7 @@ impl ParticleAppServices { Ok(()) } - pub fn call_service( + pub async fn call_service( &self, function_args: Args, particle: ParticleParams, @@ -426,7 +432,9 @@ impl ParticleAppServices { let peer_scope = particle.peer_scope; let timestamp = particle.timestamp; - let service = self.get_service(peer_scope, function_args.service_id.clone(), &particle.id); + let service = self + .get_service(peer_scope, function_args.service_id.clone(), &particle.id) + .await; let (service, service_id) = match service { Ok(found) => found, @@ -449,7 +457,7 @@ impl ParticleAppServices { // )); // } // Metrics collection are enables for services with aliases which are installed on root worker or worker spells. - let service_type = self.get_service_type(service.as_ref(), &peer_scope); + let service_type = self.get_service_type(service.as_ref(), &peer_scope).await; // TODO: move particle vault creation to aquamarine::particle_functions if create_vault { @@ -486,36 +494,39 @@ impl ParticleAppServices { let function_name = function_args.function_name; let lock_acquire_start = Instant::now(); - let mut service = service.lock(); + let mut service = service.lock().await; let old_memory = service.module_memory_stats(); let old_mem_usage = ServicesMetricsBuiltin::get_used_memory(&old_memory); // TODO async-marine: set execution timeout https://github.com/fluencelabs/fluence/issues/1212 let call_time_start = Instant::now(); - let result = tokio::runtime::Handle::current() - .block_on(service.call_async( + + let result = service + .call_async( function_name.clone(), JValue::Array(function_args.function_args), params, - )) - .map_err(|e| { - if let Some(metrics) = self.metrics.as_ref() { - let stats = ServiceCallStats::Fail { timestamp }; - // If the called function is unknown we don't want to save info - // about it in a separate entry. - let function_name = if is_unknown_function(&e) { - None - } else { - Some(function_name.clone()) - }; - metrics.observe_service_state_failed( - service_id.clone(), - function_name, - service_type.clone(), - stats, - ); - } - ServiceError::Engine(e) - })?; + ) + .await; + + let result = result.map_err(|e| { + if let Some(metrics) = self.metrics.as_ref() { + let stats = ServiceCallStats::Fail { timestamp }; + // If the called function is unknown we don't want to save info + // about it in a separate entry. + let function_name = if is_unknown_function(&e) { + None + } else { + Some(function_name.clone()) + }; + metrics.observe_service_state_failed( + service_id.clone(), + function_name, + service_type.clone(), + stats, + ); + } + ServiceError::Engine(e) + })?; if let Some(metrics) = self.metrics.as_ref() { let call_time_sec = call_time_start.elapsed().as_secs_f64(); @@ -545,7 +556,7 @@ impl ParticleAppServices { // TODO: is it safe? #[allow(clippy::too_many_arguments)] - pub fn call_function( + pub async fn call_function( &self, peer_scope: PeerScope, service_id: &str, @@ -574,7 +585,7 @@ impl ParticleAppServices { token: "host_call".to_string(), }; - self.call_service(args, particle, false) + self.call_service(args, particle, false).await } async fn add_alias_inner( @@ -584,30 +595,30 @@ impl ParticleAppServices { service_id: ServiceId, ) -> Result<(), ServiceError> { let service = { - let services = self.get_or_create_services(peer_scope); - let mut aliases_service_id_mapping = services.aliases.write(); - let services_id_mapping = services.services.write(); + let services = self.get_or_create_services(peer_scope).await; + let mut aliases_service_id_mapping = services.aliases.write().await; + let services_id_mapping = services.services.write().await; let service = get_service(&services_id_mapping, peer_scope, service_id.clone())?; - service.add_alias(alias.clone()); + service.add_alias(alias.clone()).await; aliases_service_id_mapping.insert(alias, service_id); - PersistedService::from_service(service.as_ref()) + PersistedService::from_service(service.as_ref()).await }; service.persist(&self.config.services_dir).await } - fn get_or_create_services(&self, peer_scope: PeerScope) -> Services { + async fn get_or_create_services(&self, peer_scope: PeerScope) -> Services { match peer_scope { - PeerScope::WorkerId(worker_id) => self.get_or_create_worker_services(worker_id), + PeerScope::WorkerId(worker_id) => self.get_or_create_worker_services(worker_id).await, PeerScope::Host => self.root_services.clone(), } } - fn get_services(&self, peer_scope: &PeerScope) -> Result { + async fn get_services(&self, peer_scope: &PeerScope) -> Result { match peer_scope { PeerScope::WorkerId(worker_id) => { - let worker_services = self.worker_services.read(); + let worker_services = self.worker_services.read().await; let services = worker_services .get(worker_id) @@ -620,14 +631,14 @@ impl ParticleAppServices { } } - pub fn get_service( + pub async fn get_service( &self, peer_scope: PeerScope, id_or_alias: String, particle_id: &str, ) -> Result<(Arc, String), ServiceError> { - let services = self.get_services(&peer_scope)?; - let services_id_mapping = services.services.read(); + let services = self.get_services(&peer_scope).await?; + let services_id_mapping = services.services.read().await; // retrieve service by service id let service = get_service(&services_id_mapping, peer_scope, id_or_alias.clone()).ok(); @@ -638,6 +649,7 @@ impl ParticleAppServices { // retrieve service by alias let resolved_id = resolve_alias(&services, &id_or_alias, particle_id) + .await .ok_or(NoSuchService(id_or_alias.clone(), peer_scope))?; let service = get_service(&services_id_mapping, peer_scope, resolved_id.clone())?; @@ -645,10 +657,10 @@ impl ParticleAppServices { Ok((service, resolved_id)) } - fn get_service_id(&self, peer_scope: PeerScope, alias: &str) -> Option { - let services = self.get_services(&peer_scope); + async fn get_service_id(&self, peer_scope: PeerScope, alias: &str) -> Option { + let services = self.get_services(&peer_scope).await; match services { - Ok(services) => services.aliases.read().get(alias).cloned(), + Ok(services) => services.aliases.read().await.get(alias).cloned(), Err(_) => None, } } @@ -660,11 +672,11 @@ impl ParticleAppServices { service_id: &str, ) -> Result<(), ServiceError> { let service = { - let services = self.get_or_create_services(peer_scope); - let services_id_mapping = services.services.write(); + let services = self.get_or_create_services(peer_scope).await; + let services_id_mapping = services.services.write().await; let service = get_service(&services_id_mapping, peer_scope, service_id.to_string())?; - service.remove_alias(&alias); - PersistedService::from_service(service.as_ref()) + service.remove_alias(&alias).await; + PersistedService::from_service(service.as_ref()).await }; service.persist(&self.config.services_dir).await @@ -700,7 +712,7 @@ impl ParticleAppServices { } // alias can't be equal to any existent service id - if self.service_exists(&peer_scope, &alias) { + if self.service_exists(&peer_scope, &alias).await { return Err(AliasAsServiceId(alias)); } @@ -713,11 +725,11 @@ impl ParticleAppServices { return Err(ForbiddenAlias(alias)); } - if !self.service_exists(&peer_scope, &service_id) { + if !self.service_exists(&peer_scope, &service_id).await { return Err(NoSuchService(service_id, peer_scope)); } - let prev_srv_id = self.get_service_id(peer_scope, &alias); + let prev_srv_id = self.get_service_id(peer_scope, &alias).await; if let Some(srv_id) = prev_srv_id { self.remove_alias(alias.clone(), peer_scope, &srv_id) .await?; @@ -729,44 +741,52 @@ impl ParticleAppServices { Ok(()) } - pub fn resolve_alias( + pub async fn resolve_alias( &self, peer_scope: PeerScope, alias: String, particle_id: &str, ) -> Result { - let services = self.get_or_create_services(peer_scope); - resolve_alias(&services, &alias, particle_id).ok_or_else(|| NoSuchAlias(alias, peer_scope)) + let services = self.get_or_create_services(peer_scope).await; + resolve_alias(&services, &alias, particle_id) + .await + .ok_or_else(|| NoSuchAlias(alias, peer_scope)) } - pub fn to_service_id( + pub async fn to_service_id( &self, peer_scope: PeerScope, service_id_or_alias: String, particle_id: &str, ) -> Result { - let (_, service_id) = self.get_service(peer_scope, service_id_or_alias, particle_id)?; + let (_, service_id) = self + .get_service(peer_scope, service_id_or_alias, particle_id) + .await?; Ok(service_id) } - pub fn get_service_owner( + pub async fn get_service_owner( &self, peer_scope: PeerScope, id_or_alias: String, particle_id: &str, ) -> Result { - let (service, _) = self.get_service(peer_scope, id_or_alias, particle_id)?; + let (service, _) = self + .get_service(peer_scope, id_or_alias, particle_id) + .await?; Ok(service.owner_id) } - pub fn check_service_worker_id( + pub async fn check_service_worker_id( &self, peer_scope: PeerScope, id_or_alias: String, particle_id: &str, ) -> Result<(), ServiceError> { - let (service, _) = self.get_service(peer_scope, id_or_alias.clone(), particle_id)?; + let (service, _) = self + .get_service(peer_scope, id_or_alias.clone(), particle_id) + .await?; if service.peer_scope != peer_scope { Err(ServiceError::CallServiceFailedWrongWorker { @@ -778,66 +798,86 @@ impl ParticleAppServices { } } - pub fn get_interface( + pub async fn get_interface( &self, peer_scope: PeerScope, service_id: String, particle_id: &str, ) -> Result { - let (service, _) = self.get_service(peer_scope, service_id, particle_id)?; + let (service, _) = self + .get_service(peer_scope, service_id, particle_id) + .await?; Ok(self.modules.get_facade_interface(&service.blueprint_id)?) } - pub fn list_services_all(&self) -> Vec { - let root_info: Vec = self - .root_services - .services - .read() - .iter() - .map(|(id, service)| service.get_info(id)) - .collect(); + pub async fn list_services_all(&self) -> Vec { + let root_services = self.root_services.services.read().await; + + let root_info: Vec = stream::iter(root_services.iter()) + .then(|(id, service)| async { service.get_info(id).await }) + .collect() + .await; + + drop(root_services); + + let worker_services = self.worker_services.read().await; + + let mut result: Vec = futures::stream::iter(worker_services.iter()) + .then(|(_, services)| async { + let services = services.services.read().await; + let services: Vec = futures::stream::iter(services.iter()) + .then(|(id, service)| { + let id = id.clone(); + let service = service.clone(); + async move { service.get_info(id.as_str()).await } + }) + .collect() + .await; - let mut result: Vec = self - .worker_services - .read() - .values() - .flat_map(|services| { - let services = services.services.read(); services - .iter() - .map(|(id, service)| service.get_info(id)) - .collect::>() }) - .collect(); + .fold(Vec::new(), |mut acc, item| async { + acc.extend(item); + acc + }) + .await; result.extend(root_info); result } - pub fn list_services(&self, peer_scope: PeerScope) -> Vec { - let services = self.get_services(&peer_scope); + pub async fn list_services(&self, peer_scope: PeerScope) -> Vec { + let services = self.get_services(&peer_scope).await; match services { - Ok(services) => services - .services - .read() - .iter() - .map(|(id, srv)| srv.get_info(id)) - .collect(), + Ok(services) => { + let services = services.services.read().await; + + futures::stream::iter(services.iter()) + .then(|(id, srv)| { + let srv = srv.clone(); + let id = id.clone(); + async move { srv.get_info(&id).await } + }) + .collect() + .await + } Err(_) => { vec![] } } } - pub fn get_service_mem_stats( + pub async fn get_service_mem_stats( &self, peer_scope: PeerScope, service_id: String, particle_id: &str, ) -> Result, JError> { - let (service, _) = self.get_service(peer_scope, service_id, particle_id)?; + let (service, _) = self + .get_service(peer_scope, service_id, particle_id) + .await?; - let lock = service.service.lock(); + let lock = service.service.lock().await; let stats = lock.module_memory_stats(); let stats = stats .modules @@ -903,8 +943,8 @@ impl ParticleAppServices { match service.peer_scope { PeerScope::WorkerId(worker_id) => { - let services = self.get_or_create_worker_services(worker_id); - let mut aliases = services.aliases.write(); + let services = self.get_or_create_worker_services(worker_id).await; + let mut aliases = services.aliases.write().await; for alias in service.aliases.iter() { let old = aliases.insert(alias.clone(), service.service_id.clone()); if let Some(old) = old { @@ -918,7 +958,7 @@ impl ParticleAppServices { } } PeerScope::Host => { - let mut aliases = self.root_services.aliases.write(); + let mut aliases = self.root_services.aliases.write().await; for alias in service.aliases.iter() { let old = aliases.insert(alias.clone(), service.service_id.clone()); if let Some(old) = old { @@ -979,7 +1019,7 @@ impl ParticleAppServices { let stats = ServiceMemoryStat::new(&stats); let service = Service::new( - Mutex::new(service), + tokio::sync::Mutex::new(service), service_id.clone(), blueprint_id, service_type, @@ -989,13 +1029,14 @@ impl ParticleAppServices { ); let service = Arc::new(service); // Save created service to disk, so it is recreated on restart - let persisted_service = PersistedService::from_service(&service); + let persisted_service = PersistedService::from_service(&service).await; persisted_service.persist(&self.config.services_dir).await?; - let service_type = self.get_service_type(&service, &peer_scope); - let services = self.get_or_create_services(peer_scope); + let service_type = self.get_service_type(&service, &peer_scope).await; + let services = self.get_or_create_services(peer_scope).await; let replaced = services .services .write() + .await .insert(service_id.clone(), service); if let Some(m) = self.metrics.as_ref() { @@ -1006,12 +1047,12 @@ impl ParticleAppServices { Ok(replaced) } - fn get_or_create_worker_services(&self, worker_id: WorkerId) -> Services { - let workers_services = self.worker_services.upgradable_read(); + async fn get_or_create_worker_services(&self, worker_id: WorkerId) -> Services { + let workers_services = self.worker_services.read().await; let worker_services = workers_services.get(&worker_id); match worker_services { None => { - let mut workers_services = RwLockUpgradableReadGuard::upgrade(workers_services); + let mut workers_services = self.worker_services.write().await; //we double check it, because it can be created in another thread let services = workers_services.get(&worker_id); match services { @@ -1143,13 +1184,18 @@ impl ParticleAppServices { .map_err(ServiceError::Engine) } - fn get_service_type(&self, service: &Service, peer_scope: &PeerScope) -> MetricServiceType { + async fn get_service_type( + &self, + service: &Service, + peer_scope: &PeerScope, + ) -> MetricServiceType { let allowed_alias = match peer_scope { - PeerScope::Host => service.aliases.read().first().cloned(), + PeerScope::Host => service.aliases.read().await.first().cloned(), _ => { if service .aliases .read() + .await .first() .map(|alias| alias == "worker-spell") .unwrap_or(false) diff --git a/particle-services/src/persistence.rs b/particle-services/src/persistence.rs index 69af5f90f2..9195982643 100644 --- a/particle-services/src/persistence.rs +++ b/particle-services/src/persistence.rs @@ -46,12 +46,12 @@ pub struct PersistedService { } impl PersistedService { - pub fn from_service(service: &Service) -> Self { + pub async fn from_service(service: &Service) -> Self { PersistedService { service_id: service.service_id.clone(), service_type: Some(service.service_type.clone()), blueprint_id: service.blueprint_id.clone(), - aliases: service.aliases.read().clone(), + aliases: service.aliases.read().await.clone(), owner_id: service.owner_id, peer_scope: service.peer_scope, } diff --git a/sorcerer/src/script_executor.rs b/sorcerer/src/script_executor.rs index c3b61bbdd7..148cebbbb6 100644 --- a/sorcerer/src/script_executor.rs +++ b/sorcerer/src/script_executor.rs @@ -27,7 +27,11 @@ use spell_event_bus::api::{TriggerEvent, TriggerInfoAqua}; use spell_service_api::CallParams; impl Sorcerer { - fn get_spell_counter(&self, peer_scope: PeerScope, spell_id: String) -> Result { + async fn get_spell_counter( + &self, + peer_scope: PeerScope, + spell_id: String, + ) -> Result { let init_peer_id = self.scopes.to_peer_id(peer_scope); let params = CallParams::local( peer_scope, @@ -35,13 +39,13 @@ impl Sorcerer { init_peer_id, self.spell_script_particle_ttl, ); - let counter = self.spell_service_api.get_counter(params)?; + let counter = self.spell_service_api.get_counter(params).await?; // If the counter does not exist, consider it to be 0. // It will be incremented afterwards to 1 anyway. Ok(counter.unwrap_or(0u32)) } - fn set_spell_next_counter( + async fn set_spell_next_counter( &self, peer_scope: PeerScope, spell_id: String, @@ -56,10 +60,15 @@ impl Sorcerer { ); self.spell_service_api .set_counter(params, next_counter) + .await .map_err(|e| JError::new(e.to_string())) } - fn get_spell_script(&self, peer_scope: PeerScope, spell_id: String) -> Result { + async fn get_spell_script( + &self, + peer_scope: PeerScope, + spell_id: String, + ) -> Result { let init_peer_id = self.scopes.to_peer_id(peer_scope); let params = CallParams::local( peer_scope, @@ -69,11 +78,12 @@ impl Sorcerer { ); self.spell_service_api .get_script(params) + .await .map_err(|e| JError::new(e.to_string())) } #[instrument(level = tracing::Level::INFO, skip_all)] - pub(crate) fn make_spell_particle( + pub(crate) async fn make_spell_particle( &self, peer_scope: PeerScope, spell_id: String, @@ -86,9 +96,10 @@ impl Sorcerer { peer_scope, })?; - let spell_counter = self.get_spell_counter(peer_scope, spell_id.clone())?; - self.set_spell_next_counter(peer_scope, spell_id.clone(), spell_counter + 1)?; - let spell_script = self.get_spell_script(peer_scope, spell_id.clone())?; + let spell_counter = self.get_spell_counter(peer_scope, spell_id.clone()).await?; + self.set_spell_next_counter(peer_scope, spell_id.clone(), spell_counter + 1) + .await?; + let spell_script = self.get_spell_script(peer_scope, spell_id.clone()).await?; let init_peer_id: PeerId = match peer_scope { PeerScope::WorkerId(worker_id) => worker_id.into(), PeerScope::Host => self.scopes.get_host_peer_id(), @@ -110,7 +121,7 @@ impl Sorcerer { Ok(particle) } - pub(crate) fn store_trigger( + pub(crate) async fn store_trigger( &self, event: TriggerEvent, peer_scope: PeerScope, @@ -125,6 +136,7 @@ impl Sorcerer { ); self.spell_service_api .set_trigger_event(params, serialized_event) + .await .map_err(|e| JError::new(e.to_string())) } @@ -136,9 +148,11 @@ impl Sorcerer { .get_scope(event.spell_id.clone()) .expect("Scope not found"); - let particle = self.make_spell_particle(peer_scope, event.spell_id.clone())?; + let particle = self + .make_spell_particle(peer_scope, event.spell_id.clone()) + .await?; - self.store_trigger(event.clone(), peer_scope)?; + self.store_trigger(event.clone(), peer_scope).await?; if let Some(m) = &self.spell_metrics { m.observe_spell_cast(); } diff --git a/sorcerer/src/sorcerer.rs b/sorcerer/src/sorcerer.rs index f746d8920d..1562bb0e70 100644 --- a/sorcerer/src/sorcerer.rs +++ b/sorcerer/src/sorcerer.rs @@ -63,7 +63,7 @@ pub struct Sorcerer { impl Sorcerer { #[allow(clippy::too_many_arguments)] - pub fn new( + pub async fn new( services: ParticleAppServices, modules: ModuleRepository, aquamarine: AquamarineApi, @@ -77,6 +77,7 @@ impl Sorcerer { ) -> (Self, HashMap, String) { let (spell_storage, spell_version) = SpellStorage::create(&config.dir_config.spell_base_dir, &services, &modules) + .await .expect("Spell storage creation"); let sorcerer = Self { @@ -104,16 +105,17 @@ impl Sorcerer { for spell_id in spells { log::info!("Rescheduling spell {} on {:?} peer", spell_id, peer_scope); let result: Result<(), JError> = try { - let spell_owner = - self.services - .get_service_owner(peer_scope, spell_id.clone(), "")?; + let spell_owner = self + .services + .get_service_owner(peer_scope, spell_id.clone(), "") + .await?; let params = CallParams::local( peer_scope, spell_id.clone(), spell_owner, self.spell_script_particle_ttl, ); - let config = self.spell_service_api.get_trigger_config(params)?; + let config = self.spell_service_api.get_trigger_config(params).await?; let period = config.clock.period_sec; let config = from_user_config(&config)?; if let Some(config) = config.and_then(|c| c.into_rescheduled()) { @@ -346,7 +348,7 @@ impl Sorcerer { let spell_service_api = self.spell_service_api.clone(); ServiceFunction::Immut(Box::new(move |args, params| { let spell_service_api = spell_service_api.clone(); - async move { wrap(get_spell_arg(args, params, spell_service_api)) }.boxed() + async move { wrap(get_spell_arg(args, params, spell_service_api).await) }.boxed() })) } @@ -354,7 +356,7 @@ impl Sorcerer { let spell_service_api = self.spell_service_api.clone(); ServiceFunction::Immut(Box::new(move |args, params| { let spell_service_api = spell_service_api.clone(); - async move { wrap_unit(store_error(args, params, spell_service_api)) }.boxed() + async move { wrap_unit(store_error(args, params, spell_service_api).await) }.boxed() })) } @@ -362,7 +364,7 @@ impl Sorcerer { let spell_service_api = self.spell_service_api.clone(); ServiceFunction::Immut(Box::new(move |args, params| { let spell_service_api = spell_service_api.clone(); - async move { wrap_unit(store_response(args, params, spell_service_api)) }.boxed() + async move { wrap_unit(store_response(args, params, spell_service_api).await) }.boxed() })) } diff --git a/sorcerer/src/spell_builtins.rs b/sorcerer/src/spell_builtins.rs index 6a5d91cfc8..b0fa6718c3 100644 --- a/sorcerer/src/spell_builtins.rs +++ b/sorcerer/src/spell_builtins.rs @@ -82,7 +82,7 @@ pub async fn install_spell( let params = CallParams::local(peer_scope, spell_id.clone(), owner_id, ttl); // Save the script to the spell - spell_service_api.set_script(params.clone(), script)?; + spell_service_api.set_script(params.clone(), script).await?; // Save init_data to the spell's KV let self_particle_id = format!("spell_{spell_id}_0"); let self_params = CallParams::new( @@ -92,9 +92,13 @@ pub async fn install_spell( Some(self_particle_id), ttl, ); - spell_service_api.update_kv(self_params.clone(), init_data)?; + spell_service_api + .update_kv(self_params.clone(), init_data) + .await?; // Save trigger config - spell_service_api.set_trigger_config(params, user_config)?; + spell_service_api + .set_trigger_config(params, user_config) + .await?; if let Some(config) = config { // Scheduling the spell @@ -130,7 +134,7 @@ pub struct SpellInfo { pub trigger_config: TriggerConfig, } -pub fn get_spell_info( +pub async fn get_spell_info( spell_service_api: &SpellServiceApi, peer_scope: PeerScope, ttl: Duration, @@ -140,10 +144,12 @@ pub fn get_spell_info( let params = CallParams::local(peer_scope, spell_id.clone(), init_peer_id, ttl); let trigger_config = spell_service_api .get_trigger_config(params.clone()) + .await .map_err(|e| JError::new(f!("Failed to get trigger_config for spell {spell_id}: {e}")))?; let script = spell_service_api .get_script(params) + .await .map_err(|e| JError::new(f!("Failed to get trigger_config for spell {spell_id}: {e}")))?; Ok(SpellInfo { script, @@ -290,7 +296,9 @@ pub(crate) async fn spell_remove( } }; - let spell_id = services.to_service_id(params.peer_scope, spell_id, ¶ms.id)?; + let spell_id = services + .to_service_id(params.peer_scope, spell_id, ¶ms.id) + .await?; remove_spell( ¶ms.id, @@ -343,7 +351,9 @@ pub(crate) async fn spell_update_config( } } - let spell_id = services.to_service_id(peer_scope, spell_id_or_alias.clone(), ¶ms.id)?; + let spell_id = services + .to_service_id(peer_scope, spell_id_or_alias.clone(), ¶ms.id) + .await?; let user_config: TriggerConfig = Args::next("config", &mut args)?; let config = api::from_user_config(&user_config)?; @@ -354,7 +364,9 @@ pub(crate) async fn spell_update_config( init_peer_id, Duration::from_millis(params.ttl as u64), ); - spell_service_api.set_trigger_config(params, user_config)?; + spell_service_api + .set_trigger_config(params, user_config) + .await?; let result: Result<(), EventBusError> = try { // we unsubscribe the spell from the current config anyway @@ -383,7 +395,7 @@ pub(crate) fn get_spell_id(params: ParticleParams) -> Result { Ok(json!(parse_spell_id_from(¶ms)?)) } -pub(crate) fn get_spell_arg( +pub(crate) async fn get_spell_arg( args: Args, params: ParticleParams, spell_service_api: SpellServiceApi, @@ -394,6 +406,7 @@ pub(crate) fn get_spell_arg( let str_value = spell_service_api .get_string(call_params, key.clone()) + .await .map_err(|e| JError::new(f!("Failed to get argument {key} for spell {spell_id}: {e}"))) .and_then(|value| value.ok_or_else(|| JError::new("value not found")))?; @@ -404,7 +417,7 @@ pub(crate) fn get_spell_arg( }) } -pub(crate) fn store_error( +pub(crate) async fn store_error( mut args: Args, params: ParticleParams, spell_service_api: SpellServiceApi, @@ -415,6 +428,7 @@ pub(crate) fn store_error( let call_params = CallParams::from(spell_id.clone(), params); spell_service_api .store_error(call_params, args.function_args.clone()) + .await .map_err(|e| { JError::new(format!( "Failed to store error {:?} for spell {}: {}", @@ -423,7 +437,7 @@ pub(crate) fn store_error( }) } -pub(crate) fn store_response( +pub(crate) async fn store_response( args: Args, params: ParticleParams, spell_service_api: SpellServiceApi, @@ -435,6 +449,7 @@ pub(crate) fn store_response( let call_params = CallParams::from(spell_id.clone(), params); spell_service_api .update_kv(call_params, response.clone()) + .await .map_err(|err| { JError::new(format!( "Failed to store response {response} for spell {spell_id}: {err}" diff --git a/sorcerer/src/worker_builins.rs b/sorcerer/src/worker_builins.rs index 5aa7f70566..f22f8099d3 100644 --- a/sorcerer/src/worker_builins.rs +++ b/sorcerer/src/worker_builins.rs @@ -179,6 +179,7 @@ pub(crate) async fn deactivate_deal( ), TriggerConfig::default(), ) + .await .map_err(|e| { JError::new(format!( "Deal deactivation failed due to failure to stop spell {spell_id} : {e}" @@ -216,26 +217,30 @@ pub(crate) async fn activate_deal( return Err(JError::new("Deal has already been activated")); } - let installation_spell_id = services.resolve_alias( - PeerScope::WorkerId(worker_id), - "worker-spell".to_string(), - ¶ms.id, - )?; + let installation_spell_id = services + .resolve_alias( + PeerScope::WorkerId(worker_id), + "worker-spell".to_string(), + ¶ms.id, + ) + .await?; // same as in decider-distro let mut worker_config = TriggerConfig::default(); worker_config.clock.start_sec = 1; worker_config.clock.period_sec = worker_period_sec; - spell_service_api.set_trigger_config( - CallParams::local( - PeerScope::WorkerId(worker_id), - installation_spell_id.clone(), - worker_id.into(), - Duration::from_millis(params.ttl as u64), - ), - worker_config.clone(), - )?; + spell_service_api + .set_trigger_config( + CallParams::local( + PeerScope::WorkerId(worker_id), + installation_spell_id.clone(), + worker_id.into(), + Duration::from_millis(params.ttl as u64), + ), + worker_config.clone(), + ) + .await?; let trigger_config = from_user_config(&worker_config)?.ok_or(JError::new( "Deal activation failed due to failure to parse trigger config", diff --git a/spell-storage/src/storage.rs b/spell-storage/src/storage.rs index 0fe1505565..15ecece1ef 100644 --- a/spell-storage/src/storage.rs +++ b/spell-storage/src/storage.rs @@ -25,7 +25,7 @@ pub struct SpellStorage { } impl SpellStorage { - pub fn create( + pub async fn create( spells_base_dir: &Path, services: &ParticleAppServices, modules: &ModuleRepository, @@ -37,7 +37,7 @@ impl SpellStorage { } else { Self::load_spell_service_from_crate(modules)? }; - let (registered_spells, scope_mapping) = Self::restore_spells(services); + let (registered_spells, scope_mapping) = Self::restore_spells(services).await; Ok(( Self { @@ -103,7 +103,7 @@ impl SpellStorage { )) } - fn restore_spells( + async fn restore_spells( services: &ParticleAppServices, ) -> ( HashMap>, @@ -114,6 +114,7 @@ impl SpellStorage { let spell_services = services .list_services_all() + .await .into_iter() .filter(|s| s.service_type.is_spell());