diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 0568fcb2f6..9492eaaccd 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -82,23 +82,21 @@ jobs: cli: needs: - nox-snapshot - uses: fluencelabs/cli/.github/workflows/tests.yml@renovate/fluencelabs-js-client-0.x + uses: fluencelabs/cli/.github/workflows/tests.yml@main with: - ref: renovate/fluencelabs-js-client-0.x nox-image: "${{ needs.nox-snapshot.outputs.nox-image }}" js-client: needs: - nox-snapshot - uses: fluencelabs/js-client/.github/workflows/tests.yml@master + uses: fluencelabs/js-client/.github/workflows/tests.yml@main with: - ref: js-client-v0.9.0 nox-image: "${{ needs.nox-snapshot.outputs.nox-image }}" aqua: needs: - nox-snapshot - uses: fluencelabs/aqua/.github/workflows/tests.yml@renovate/fluencelabs-js-client-0.x + uses: fluencelabs/aqua/.github/workflows/tests.yml@main with: nox-image: "${{ needs.nox-snapshot.outputs.nox-image }}" ref: renovate/fluencelabs-js-client-0.x diff --git a/Cargo.lock b/Cargo.lock index e2e4885edd..c81c04ff88 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5418,6 +5418,7 @@ dependencies = [ "tracing-log", "tracing-logfmt", "tracing-opentelemetry", + "tracing-panic", "tracing-subscriber", "workers", ] @@ -8314,6 +8315,16 @@ dependencies = [ "web-time", ] +[[package]] +name = "tracing-panic" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eaf80030ce049691c9922d75be63cadf345110a245cd4581833c66f87c02ad25" +dependencies = [ + "tracing", + "tracing-subscriber", +] + [[package]] name = "tracing-subscriber" version = "0.3.18" diff --git a/crates/created-swarm/src/swarm.rs b/crates/created-swarm/src/swarm.rs index f659111cad..c3a6cdca01 100644 --- a/crates/created-swarm/src/swarm.rs +++ b/crates/created-swarm/src/swarm.rs @@ -46,7 +46,7 @@ use server_config::{ UnresolvedConfig, }; use tempfile::TempDir; -use test_constants::{EXECUTION_TIMEOUT, TRANSPORT_TIMEOUT}; +use test_constants::{EXECUTION_TIMEOUT, IDLE_CONNECTION_TIMEOUT, TRANSPORT_TIMEOUT}; use tokio::sync::oneshot; use toy_vms::EasyVM; use tracing::{Instrument, Span}; @@ -403,6 +403,7 @@ pub async fn create_swarm_with_runtime( resolved.node_config.aquavm_pool_size = config.pool_size.unwrap_or(1); resolved.node_config.particle_execution_timeout = EXECUTION_TIMEOUT; + resolved.node_config.transport_config.connection_idle_timeout = IDLE_CONNECTION_TIMEOUT; let allowed_effectors = config.allowed_effectors.iter().map(|(cid, binaries)| { (Hash::from_string(cid).unwrap(), binaries.clone()) diff --git a/crates/nox-tests/tests/workers.rs b/crates/nox-tests/tests/workers.rs index e554d98b59..236531fc0f 100644 --- a/crates/nox-tests/tests/workers.rs +++ b/crates/nox-tests/tests/workers.rs @@ -2,11 +2,12 @@ use connected_client::ConnectedClient; use created_swarm::make_swarms; use eyre::Context; use hex::FromHex; +use log_utils::enable_logs; use maplit::hashmap; -use serde_json::json; +use serde_json::{json, Value}; use workers::CUID; -async fn create_worker(client: &mut ConnectedClient, deal_id: &str) -> String { +pub(crate) async fn create_worker(client: &mut ConnectedClient, deal_id: &str) -> String { let init_id_1 = ::from_hex("54ae1b506c260367a054f80800a545f23e32c6bc4a8908c9a794cb8dad23e5ea") .unwrap(); @@ -116,3 +117,52 @@ async fn test_worker_different_deal_ids() { assert!(is_worker_active(&mut client, deal_id_lowercase_prefix).await); assert!(is_worker_active(&mut client, deal_id_mixed_prefix).await); } + +#[tokio::test] +async fn test_resolve_subnet_on_worker() { + let deal_id = "0x9DcaFca9B88f49d91c38a32E7d9A86a7d9a37B04"; + + enable_logs(); + let script = tokio::fs::read("./tests/workers/test_subnet_resolve_on_worker.air") + .await + .wrap_err("read test data") + .unwrap(); + let script = String::from_utf8(script) + .wrap_err("decode test data") + .unwrap(); + + let swarms = make_swarms(1).await; + + let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone()) + .await + .wrap_err("connect client") + .unwrap(); + + let worker_id = create_worker(&mut client, deal_id).await; + + let data = hashmap! { + "-relay-" => json!(swarms[0].peer_id.to_string()), + "-worker_id-" => json!(worker_id), + "-deal_id-" => json!(deal_id), + }; + + let result = client + .execute_particle(script.clone(), data.clone()) + .await + .wrap_err("execute particle") + .unwrap(); + + let expected = { + let error = Value::Array( + vec![Value::String("error sending jsonrpc request: 'Networking or low-level protocol error: Server returned an error status code: 429'".to_string())] + ); + let mut object_map = serde_json::Map::new(); + object_map.insert("error".to_string(), error); + object_map.insert("success".to_string(), Value::Bool(false)); + object_map.insert("workers".to_string(), Value::Array(vec![])); + + vec![Value::Object(object_map)] + }; + + assert_eq!(result, expected) +} diff --git a/crates/nox-tests/tests/workers/test_subnet_resolve_on_worker.air b/crates/nox-tests/tests/workers/test_subnet_resolve_on_worker.air new file mode 100644 index 0000000000..7820c6e7ce --- /dev/null +++ b/crates/nox-tests/tests/workers/test_subnet_resolve_on_worker.air @@ -0,0 +1,13 @@ +(xor + (seq + (call -relay- ("op" "noop") []) + (seq + (call -worker_id- ("op" "noop") []) + (seq + (call -worker_id- ("subnet" "resolve") [-deal_id-] subnet) + (call %init_peer_id% ("op" "return") [subnet]) + ) + ) + ) + (call %init_peer_id% ("op" "return") [%last_error%.$.instruction]) +) diff --git a/crates/server-config/src/defaults.rs b/crates/server-config/src/defaults.rs index b3b287e3d5..d1574b4198 100644 --- a/crates/server-config/src/defaults.rs +++ b/crates/server-config/src/defaults.rs @@ -49,7 +49,8 @@ pub fn default_socket_timeout() -> Duration { } pub fn default_connection_idle_timeout() -> Duration { - Duration::from_secs(10) + // 180 seconds makes sense because default Particle TTL is 120 sec, and it doesn't seem very efficient for hosts to reconnect while particle is still in flight + Duration::from_secs(180) } pub fn default_max_established_per_peer_limit() -> Option { diff --git a/crates/subnet-resolver/src/resolve.rs b/crates/subnet-resolver/src/resolve.rs index 856cb9fe6b..e50c415298 100644 --- a/crates/subnet-resolver/src/resolve.rs +++ b/crates/subnet-resolver/src/resolve.rs @@ -10,7 +10,6 @@ use chain_data::{next_opt, parse_peer_id, ChainDataError}; use hex_utils::decode_hex; use serde::{Deserialize, Serialize}; use serde_json::json; -use tokio::runtime::Handle; /// Parse data from chain. Accepts data with and without "0x" prefix. pub fn parse_chain_data(data: &str) -> Result, ChainDataError> { @@ -99,7 +98,7 @@ pub fn validate_deal_id(deal_id: String) -> Result { } } -pub fn resolve_subnet(deal_id: String, api_endpoint: &str) -> SubnetResolveResult { +pub async fn resolve_subnet(deal_id: String, api_endpoint: &str) -> SubnetResolveResult { let res: Result<_, ResolveSubnetError> = try { let deal_id = validate_deal_id(deal_id)?; // Description of the `getComputeUnits` function from the `chain.workers` smart contract on chain @@ -115,9 +114,7 @@ pub fn resolve_subnet(deal_id: String, api_endpoint: &str) -> SubnetResolveResul let input = format!("0x{}", hex::encode(input)); let client = HttpClientBuilder::default().build(api_endpoint)?; let params = rpc_params![json!({ "data": input, "to": deal_id }), json!("latest")]; - let response: Result = tokio::task::block_in_place(move || { - Handle::current().block_on(async move { client.request("eth_call", params).await }) - }); + let response = client.request("eth_call", params).await; let pats = response?; diff --git a/crates/test-constants/src/lib.rs b/crates/test-constants/src/lib.rs index 94fdd1634a..69ea0822b4 100644 --- a/crates/test-constants/src/lib.rs +++ b/crates/test-constants/src/lib.rs @@ -35,6 +35,6 @@ pub static TIMEOUT: Duration = Duration::from_secs(15); pub static SHORT_TIMEOUT: Duration = Duration::from_millis(300); pub static TRANSPORT_TIMEOUT: Duration = Duration::from_millis(500); -pub static IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(30); +pub static IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10); pub static EXECUTION_TIMEOUT: Duration = Duration::from_millis(5000); pub static PARTICLE_TTL: u32 = 20000; diff --git a/crates/workers/src/workers.rs b/crates/workers/src/workers.rs index 3e3889543f..cd974d9c26 100644 --- a/crates/workers/src/workers.rs +++ b/crates/workers/src/workers.rs @@ -8,7 +8,7 @@ use core_manager::types::{AcquireRequest, WorkType}; use core_manager::CUID; use fluence_libp2p::PeerId; use parking_lot::RwLock; -use tokio::runtime::{Handle, Runtime}; +use tokio::runtime::{Handle, Runtime, UnhandledPanic}; use types::peer_scope::WorkerId; use types::DealId; @@ -140,9 +140,12 @@ impl Workers { .worker_threads(threads_count) // Configuring blocking threads for handling I/O .max_blocking_threads(threads_count) + .enable_time() + .enable_io() .on_thread_start(move || { assignment.pin_current_thread(); }) + .unhandled_panic(UnhandledPanic::Ignore) // TODO: try to log panics after fix https://github.com/tokio-rs/tokio/issues/4516 .build() .map_err(|err| WorkersError::CreateRuntime { worker_id, err })?; Ok(runtime) diff --git a/nox/Cargo.toml b/nox/Cargo.toml index 2b639fcc21..b6e24abcf9 100644 --- a/nox/Cargo.toml +++ b/nox/Cargo.toml @@ -72,6 +72,7 @@ once_cell = { workspace = true } config = "0.13.4" tonic = "0.9.2" jsonrpsee = { workspace = true, features = ["ws-client", "macros"] } +tracing-panic = "0.1.1" [dev-dependencies] parking_lot = { workspace = true } diff --git a/nox/src/main.rs b/nox/src/main.rs index 8fd749d9c4..7eb1c31d1c 100644 --- a/nox/src/main.rs +++ b/nox/src/main.rs @@ -43,6 +43,7 @@ use core_manager::manager::{CoreManager, CoreManagerFunctions, PersistentCoreMan use fs_utils::to_abs_path; use nox::{env_filter, log_layer, tokio_console_layer, tracing_layer, Node}; use server_config::{load_config, ConfigData, ResolvedConfig}; +use tracing_panic::panic_hook; const VERSION: &str = env!("CARGO_PKG_VERSION"); const AUTHORS: &str = env!("CARGO_PKG_AUTHORS"); @@ -61,6 +62,12 @@ fn main() -> eyre::Result<()> { #[cfg(feature = "dhat-heap")] let _profiler = dhat::Profiler::new_heap(); + let prev_hook = std::panic::take_hook(); + std::panic::set_hook(Box::new(move |panic_info| { + panic_hook(panic_info); + prev_hook(panic_info); + })); + let version = format!("{}; AIR version {}", VERSION, air_interpreter_wasm::VERSION); let authors = format!("by {AUTHORS}"); let config_data = ConfigData { diff --git a/particle-builtins/src/builtins.rs b/particle-builtins/src/builtins.rs index 82188a860d..388fcd5a91 100644 --- a/particle-builtins/src/builtins.rs +++ b/particle-builtins/src/builtins.rs @@ -281,7 +281,7 @@ where ("vault", "put") => wrap(self.vault_put(args, particle)), ("vault", "cat") => wrap(self.vault_cat(args, particle)), - ("subnet", "resolve") => wrap(self.subnet_resolve(args)), + ("subnet", "resolve") => wrap(self.subnet_resolve(args).await), ("run-console", "print") => { let function_args = args.function_args.iter(); let decider = function_args.filter_map(JValue::as_str).any(|s| s.contains("decider")); @@ -1037,10 +1037,10 @@ where .map_err(|_| JError::new(format!("Error reading vault file `{path}`"))) } - fn subnet_resolve(&self, args: Args) -> Result { + async fn subnet_resolve(&self, args: Args) -> Result { let mut args = args.function_args.into_iter(); let deal_id: String = Args::next("deal_id", &mut args)?; - let result = subnet_resolver::resolve_subnet(deal_id, &self.connector_api_endpoint); + let result = subnet_resolver::resolve_subnet(deal_id, &self.connector_api_endpoint).await; Ok(json!(result)) } }