Skip to content

Commit

Permalink
remove block_on for marine call
Browse files Browse the repository at this point in the history
  • Loading branch information
gurinderu committed Apr 5, 2024
1 parent 4a7a6fc commit 83092fe
Show file tree
Hide file tree
Showing 21 changed files with 669 additions and 439 deletions.
22 changes: 16 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,11 @@ fluence-spell-distro = "=0.7.5"
fluence-app-service = { version = "=0.35.2", git = "https://github.com/fluencelabs/marine", branch = "feat/async-call", features = ["wasmtime"] }
marine-utils = "0.5.1"
marine-it-parser = { version = "0.16.0", git = "https://github.com/fluencelabs/marine", branch = "feat/async-call" }
marine-module-info-parser = { version = "0.15.0", git = "https://github.com/fluencelabs/marine", branch = "feat/async-call" }
marine-wasmtime-backend = { version = "0.6.0", git = "https://github.com/fluencelabs/marine", branch = "feat/async-call" }
marine-module-info-parser = { version = "0.15.0", git = "https://github.com/fluencelabs/marine", branch = "feat/async-call" }
marine-wasmtime-backend = { version = "0.6.0", git = "https://github.com/fluencelabs/marine", branch = "feat/async-call" }

# avm
avm-server = { version = "=0.37.0", path = "../aquavm/avm/server" }
avm-server = { git = "https://github.com/fluencelabs/aquavm", version = "=0.37.0", branch = "feat/use-async-marine" }
air-interpreter-wasm = "=0.62.0"

# libp2p
Expand Down
23 changes: 12 additions & 11 deletions aquamarine/src/aqua_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ use avm_server::avm_runner::{AVMRunner, RawAVMOutcome};
use avm_server::{
AVMMemoryStats, AVMRuntimeLimits, CallRequests, CallResults, ParticleParameters, RunnerError,
};
use marine_wasmtime_backend::{WasmtimeConfig, WasmtimeWasmBackend};
use fluence_keypair::KeyPair;
use libp2p::PeerId;
use marine_wasmtime_backend::{WasmtimeConfig, WasmtimeWasmBackend};
use tracing::Level;

use crate::config::VmConfig;
Expand Down Expand Up @@ -72,22 +72,23 @@ impl AquaRuntime for AVMRunner<WasmtimeWasmBackend> {
.epoch_interruption(true)
.async_wasm_stack(2 * 1024 * 1024)
.max_wasm_stack(2 * 1024 * 1024);
let backend = WasmtimeWasmBackend::new(wasmtime_config)
.map_err(|e| Self::Error::MarineError(
fluence_app_service::MarineError::EngineError(e.into())))?;
let backend = WasmtimeWasmBackend::new(wasmtime_config).map_err(|e| {
Self::Error::MarineError(fluence_app_service::MarineError::EngineError(e.into()))
})?;
let avm_runtime_limits = AVMRuntimeLimits::new(
config.air_size_limit,
config.particle_size_limit,
config.call_result_size_limit,
config.hard_limit_enabled,
);
let vm: AVMRunner<WasmtimeWasmBackend> = tokio::runtime::Handle::current().block_on(AVMRunner::new(
config.air_interpreter,
config.max_heap_size,
avm_runtime_limits,
i32::MAX,
backend,
))?;
let vm: AVMRunner<WasmtimeWasmBackend> =
tokio::runtime::Handle::current().block_on(AVMRunner::new(
config.air_interpreter,
config.max_heap_size,
avm_runtime_limits,
i32::MAX,
backend,
))?;
waker.wake();
Ok(vm)
}
Expand Down
27 changes: 16 additions & 11 deletions crates/local-vm/src/local_vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use std::{collections::HashMap, time::Duration};

use avm_server::avm_runner::{AVMRunner, RawAVMOutcome};
use avm_server::{CallResults, CallServiceResult};
use marine_wasmtime_backend::{WasmtimeConfig, WasmtimeWasmBackend};
use fstrings::f;
use libp2p::PeerId;
use marine_wasmtime_backend::{WasmtimeConfig, WasmtimeWasmBackend};
use serde_json::{json, Value as JValue};

use air_interpreter_fs::{air_interpreter_path, write_default_air_interpreter};
Expand Down Expand Up @@ -180,24 +180,29 @@ pub fn make_wasm_backend() -> WasmtimeWasmBackend {
.epoch_interruption(true)
.async_wasm_stack(2 * 1024 * 1024)
.max_wasm_stack(2 * 1024 * 1024);
WasmtimeWasmBackend::new(wasmtime_config)
.expect("Cannot create WasmtimeWasmBackend")
WasmtimeWasmBackend::new(wasmtime_config).expect("Cannot create WasmtimeWasmBackend")
}

pub async fn make_vm(tmp_dir_path: &Path) -> AVMRunner<WasmtimeWasmBackend> {
let interpreter = air_interpreter_path(tmp_dir_path);
write_default_air_interpreter(&interpreter).expect("write air interpreter");

AVMRunner::new(interpreter, None, <_>::default(), i32::MAX, make_wasm_backend())
.await
.map_err(|err| {
log::error!("\n\n\nFailed to create local AVM: {:#?}\n\n\n", err);
AVMRunner::new(
interpreter,
None,
<_>::default(),
i32::MAX,
make_wasm_backend(),
)
.await
.map_err(|err| {
log::error!("\n\n\nFailed to create local AVM: {:#?}\n\n\n", err);

println!("\n\n\nFailed to create local AVM: {err:#?}\n\n\n");
println!("\n\n\nFailed to create local AVM: {err:#?}\n\n\n");

err
})
.expect("vm should be created")
err
})
.expect("vm should be created")
}

pub fn wrap_script(
Expand Down
2 changes: 1 addition & 1 deletion crates/nox-tests/tests/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async fn test_system_service_override() {
config,
};
let name = service_name.clone();
let init: system_services::InitService = Box::new(move |call, status| {
let init: dyn system_services::InitService = Box::new(move |call, status| {
let name = name.clone();
let service_status = status
.services
Expand Down
74 changes: 43 additions & 31 deletions crates/spell-service-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,24 +111,24 @@ impl SpellServiceApi {
Self { services }
}

pub fn set_script(&self, params: CallParams, script: String) -> Result<(), CallError> {
pub async fn set_script(&self, params: CallParams, script: String) -> Result<(), CallError> {
let function = Function {
name: "set_script",
args: vec![json!(script)],
};
let _ = self.call::<UnitValue>(params, function)?;
let _ = self.call::<UnitValue>(params, function).await?;
Ok(())
}

pub fn get_script(&self, params: CallParams) -> Result<String, CallError> {
pub async fn get_script(&self, params: CallParams) -> Result<String, CallError> {
let function = Function {
name: "get_script",
args: vec![],
};
let script_value = self.call::<ScriptValue>(params, function)?;
let script_value = self.call::<ScriptValue>(params, function).await?;
Ok(script_value.value)
}
pub fn set_trigger_config(
pub async fn set_trigger_config(
&self,
params: CallParams,
config: TriggerConfig,
Expand All @@ -137,39 +137,43 @@ impl SpellServiceApi {
name: "set_trigger_config",
args: vec![json!(config)],
};
let _ = self.call::<UnitValue>(params, function)?;
let _ = self.call::<UnitValue>(params, function).await?;
Ok(())
}

pub fn get_trigger_config(&self, params: CallParams) -> Result<TriggerConfig, CallError> {
pub async fn get_trigger_config(&self, params: CallParams) -> Result<TriggerConfig, CallError> {
let function = Function {
name: "get_trigger_config",
args: vec![],
};
let trigger_config_value = self.call::<TriggerConfigValue>(params, function)?;
let trigger_config_value = self.call::<TriggerConfigValue>(params, function).await?;
Ok(trigger_config_value.config)
}

// TODO: use `Map<String, Value>` for init_data instead of `Value`
pub fn update_kv(&self, params: CallParams, kv_data: Value) -> Result<(), CallError> {
pub async fn update_kv(&self, params: CallParams, kv_data: Value) -> Result<(), CallError> {
let function = Function {
name: "set_json_fields",
args: vec![json!(kv_data.to_string())],
};
let _ = self.call::<UnitValue>(params, function)?;
let _ = self.call::<UnitValue>(params, function).await?;
Ok(())
}

pub fn get_string(&self, params: CallParams, key: String) -> Result<Option<String>, CallError> {
pub async fn get_string(
&self,
params: CallParams,
key: String,
) -> Result<Option<String>, CallError> {
let function = Function {
name: "get_string",
args: vec![json!(key)],
};
let result = self.call::<StringValue>(params, function)?;
let result = self.call::<StringValue>(params, function).await?;
Ok((!result.absent).then_some(result.value))
}

pub fn set_string(
pub async fn set_string(
&self,
params: CallParams,
key: String,
Expand All @@ -179,60 +183,68 @@ impl SpellServiceApi {
name: "set_string",
args: vec![json!(key), json!(value)],
};
let _ = self.call::<UnitValue>(params, function)?;
let _ = self.call::<UnitValue>(params, function).await?;
Ok(())
}

/// Load the counter (how many times the spell was run)
pub fn get_counter(&self, params: CallParams) -> Result<Option<u32>, CallError> {
pub async fn get_counter(&self, params: CallParams) -> Result<Option<u32>, CallError> {
let function = Function {
name: "get_u32",
args: vec![json!("hw_counter")],
};
let result = self.call::<U32Value>(params, function)?;
let result = self.call::<U32Value>(params, function).await?;
Ok((!result.absent).then_some(result.value))
}

/// Update the counter (how many times the spell was run)
/// TODO: permission check here or not?
pub fn set_counter(&self, params: CallParams, counter: u32) -> Result<(), CallError> {
pub async fn set_counter(&self, params: CallParams, counter: u32) -> Result<(), CallError> {
let function = Function {
name: "set_u32",
args: vec![json!("hw_counter"), json!(counter)],
};
let _ = self.call::<UnitValue>(params, function)?;
let _ = self.call::<UnitValue>(params, function).await?;

Ok(())
}

pub fn set_trigger_event(&self, params: CallParams, event: String) -> Result<(), CallError> {
pub async fn set_trigger_event(
&self,
params: CallParams,
event: String,
) -> Result<(), CallError> {
self.set_string(params, "hw_trigger".to_string(), event)
.await
}

pub fn store_error(&self, params: CallParams, args: Vec<Value>) -> Result<(), CallError> {
pub async fn store_error(&self, params: CallParams, args: Vec<Value>) -> Result<(), CallError> {
let function = Function {
name: "store_error",
args,
};
let _ = self.call::<UnitValue>(params, function)?;
let _ = self.call::<UnitValue>(params, function).await?;
Ok(())
}

fn call<T>(&self, params: CallParams, function: Function) -> Result<T, CallError>
async fn call<T>(&self, params: CallParams, function: Function) -> Result<T, CallError>
where
T: DeserializeOwned + SpellValueT,
{
use CallError::*;
let spell_id = params.spell_id;
let result = self.services.call_function(
params.peer_scope,
&spell_id,
function.name,
function.args,
params.particle_id,
params.init_peer_id,
params.ttl,
);
let result = self
.services
.call_function(
params.peer_scope,
&spell_id,
function.name,
function.args,
params.particle_id,
params.init_peer_id,
params.ttl,
)
.await;
match result {
FunctionOutcome::NotDefined { .. } => Err(ServiceNotFound {
spell_id,
Expand Down
Loading

0 comments on commit 83092fe

Please sign in to comment.