Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(workers): add io+timer drivers to tokio runtimes [fixes NET-795] #2135

Merged
merged 13 commits into from
Mar 7, 2024
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/created-swarm/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use server_config::{
UnresolvedConfig,
};
use tempfile::TempDir;
use test_constants::{EXECUTION_TIMEOUT, TRANSPORT_TIMEOUT};
use test_constants::{EXECUTION_TIMEOUT, IDLE_CONNECTION_TIMEOUT, TRANSPORT_TIMEOUT};
use tokio::sync::oneshot;
use toy_vms::EasyVM;
use tracing::{Instrument, Span};
Expand Down Expand Up @@ -403,6 +403,7 @@ pub async fn create_swarm_with_runtime<RT: AquaRuntime>(

resolved.node_config.aquavm_pool_size = config.pool_size.unwrap_or(1);
resolved.node_config.particle_execution_timeout = EXECUTION_TIMEOUT;
resolved.node_config.transport_config.connection_idle_timeout = IDLE_CONNECTION_TIMEOUT;

let allowed_effectors = config.allowed_effectors.iter().map(|(cid, binaries)| {
(Hash::from_string(cid).unwrap(), binaries.clone())
Expand Down
54 changes: 52 additions & 2 deletions crates/nox-tests/tests/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ use connected_client::ConnectedClient;
use created_swarm::make_swarms;
use eyre::Context;
use hex::FromHex;
use log_utils::enable_logs;
use maplit::hashmap;
use serde_json::json;
use serde_json::{json, Value};
use workers::CUID;

async fn create_worker(client: &mut ConnectedClient, deal_id: &str) -> String {
pub(crate) async fn create_worker(client: &mut ConnectedClient, deal_id: &str) -> String {
let init_id_1 =
<CUID>::from_hex("54ae1b506c260367a054f80800a545f23e32c6bc4a8908c9a794cb8dad23e5ea")
.unwrap();
Expand Down Expand Up @@ -116,3 +117,52 @@ async fn test_worker_different_deal_ids() {
assert!(is_worker_active(&mut client, deal_id_lowercase_prefix).await);
assert!(is_worker_active(&mut client, deal_id_mixed_prefix).await);
}

#[tokio::test]
async fn test_resolve_subnet_on_worker() {
gurinderu marked this conversation as resolved.
Show resolved Hide resolved
let deal_id = "0x9DcaFca9B88f49d91c38a32E7d9A86a7d9a37B04";

enable_logs();
let script = tokio::fs::read("./tests/workers/test_subnet_resolve_on_worker.air")
.await
.wrap_err("read test data")
.unwrap();
let script = String::from_utf8(script)
.wrap_err("decode test data")
.unwrap();

let swarms = make_swarms(1).await;

let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone())
.await
.wrap_err("connect client")
.unwrap();

let worker_id = create_worker(&mut client, deal_id).await;

let data = hashmap! {
"-relay-" => json!(swarms[0].peer_id.to_string()),
"-worker_id-" => json!(worker_id),
"-deal_id-" => json!(deal_id),
};

let result = client
.execute_particle(script.clone(), data.clone())
.await
.wrap_err("execute particle")
.unwrap();

let expected = {
let error = Value::Array(
vec![Value::String("error sending jsonrpc request: 'Networking or low-level protocol error: Server returned an error status code: 429'".to_string())]
);
let mut object_map = serde_json::Map::new();
object_map.insert("error".to_string(), error);
object_map.insert("success".to_string(), Value::Bool(false));
object_map.insert("workers".to_string(), Value::Array(vec![]));

vec![Value::Object(object_map)]
};

assert_eq!(result, expected)
}
13 changes: 13 additions & 0 deletions crates/nox-tests/tests/workers/test_subnet_resolve_on_worker.air
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
(xor
(seq
(call -relay- ("op" "noop") [])
(seq
(call -worker_id- ("op" "noop") [])
(seq
(call -worker_id- ("subnet" "resolve") [-deal_id-] subnet)
(call %init_peer_id% ("op" "return") [subnet])
)
)
)
(call %init_peer_id% ("op" "return") [%last_error%.$.instruction])
)
2 changes: 1 addition & 1 deletion crates/server-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub fn default_socket_timeout() -> Duration {
}

pub fn default_connection_idle_timeout() -> Duration {
Duration::from_secs(10)
Duration::from_secs(180)
gurinderu marked this conversation as resolved.
Show resolved Hide resolved
}

pub fn default_max_established_per_peer_limit() -> Option<u32> {
Expand Down
7 changes: 2 additions & 5 deletions crates/subnet-resolver/src/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use chain_data::{next_opt, parse_peer_id, ChainDataError};
use hex_utils::decode_hex;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tokio::runtime::Handle;

/// Parse data from chain. Accepts data with and without "0x" prefix.
pub fn parse_chain_data(data: &str) -> Result<Vec<Token>, ChainDataError> {
Expand Down Expand Up @@ -99,7 +98,7 @@ pub fn validate_deal_id(deal_id: String) -> Result<String, ResolveSubnetError> {
}
}

pub fn resolve_subnet(deal_id: String, api_endpoint: &str) -> SubnetResolveResult {
pub async fn resolve_subnet(deal_id: String, api_endpoint: &str) -> SubnetResolveResult {
let res: Result<_, ResolveSubnetError> = try {
let deal_id = validate_deal_id(deal_id)?;
// Description of the `getComputeUnits` function from the `chain.workers` smart contract on chain
Expand All @@ -115,9 +114,7 @@ pub fn resolve_subnet(deal_id: String, api_endpoint: &str) -> SubnetResolveResul
let input = format!("0x{}", hex::encode(input));
let client = HttpClientBuilder::default().build(api_endpoint)?;
let params = rpc_params![json!({ "data": input, "to": deal_id }), json!("latest")];
let response: Result<String, _> = tokio::task::block_in_place(move || {
Handle::current().block_on(async move { client.request("eth_call", params).await })
});
let response = client.request("eth_call", params).await;
gurinderu marked this conversation as resolved.
Show resolved Hide resolved

let pats = response?;

Expand Down
2 changes: 1 addition & 1 deletion crates/test-constants/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ pub static TIMEOUT: Duration = Duration::from_secs(15);

pub static SHORT_TIMEOUT: Duration = Duration::from_millis(300);
pub static TRANSPORT_TIMEOUT: Duration = Duration::from_millis(500);
pub static IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(30);
pub static IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
pub static EXECUTION_TIMEOUT: Duration = Duration::from_millis(5000);
pub static PARTICLE_TTL: u32 = 20000;
5 changes: 4 additions & 1 deletion crates/workers/src/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use core_manager::types::{AcquireRequest, WorkType};
use core_manager::CUID;
use fluence_libp2p::PeerId;
use parking_lot::RwLock;
use tokio::runtime::{Handle, Runtime};
use tokio::runtime::{Handle, Runtime, UnhandledPanic};
use types::peer_scope::WorkerId;
use types::DealId;

Expand Down Expand Up @@ -140,9 +140,12 @@ impl Workers {
.worker_threads(threads_count)
// Configuring blocking threads for handling I/O
.max_blocking_threads(threads_count)
.enable_time()
.enable_io()
.on_thread_start(move || {
assignment.pin_current_thread();
})
.unhandled_panic(UnhandledPanic::Ignore) // TODO: try to log panics after fix https://github.com/tokio-rs/tokio/issues/4516
gurinderu marked this conversation as resolved.
Show resolved Hide resolved
.build()
.map_err(|err| WorkersError::CreateRuntime { worker_id, err })?;
Ok(runtime)
Expand Down
1 change: 1 addition & 0 deletions nox/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ once_cell = { workspace = true }
config = "0.13.4"
tonic = "0.9.2"
jsonrpsee = { workspace = true, features = ["ws-client", "macros"] }
tracing-panic = "0.1.1"

[dev-dependencies]
parking_lot = { workspace = true }
Expand Down
7 changes: 7 additions & 0 deletions nox/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use core_manager::manager::{CoreManager, CoreManagerFunctions, PersistentCoreMan
use fs_utils::to_abs_path;
use nox::{env_filter, log_layer, tokio_console_layer, tracing_layer, Node};
use server_config::{load_config, ConfigData, ResolvedConfig};
use tracing_panic::panic_hook;

const VERSION: &str = env!("CARGO_PKG_VERSION");
const AUTHORS: &str = env!("CARGO_PKG_AUTHORS");
Expand All @@ -61,6 +62,12 @@ fn main() -> eyre::Result<()> {
#[cfg(feature = "dhat-heap")]
let _profiler = dhat::Profiler::new_heap();

let prev_hook = std::panic::take_hook();
std::panic::set_hook(Box::new(move |panic_info| {
panic_hook(panic_info);
prev_hook(panic_info);
}));

let version = format!("{}; AIR version {}", VERSION, air_interpreter_wasm::VERSION);
let authors = format!("by {AUTHORS}");
let config_data = ConfigData {
Expand Down
6 changes: 3 additions & 3 deletions particle-builtins/src/builtins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ where
("vault", "put") => wrap(self.vault_put(args, particle)),
("vault", "cat") => wrap(self.vault_cat(args, particle)),

("subnet", "resolve") => wrap(self.subnet_resolve(args)),
("subnet", "resolve") => wrap(self.subnet_resolve(args).await),
("run-console", "print") => {
let function_args = args.function_args.iter();
let decider = function_args.filter_map(JValue::as_str).any(|s| s.contains("decider"));
Expand Down Expand Up @@ -1037,10 +1037,10 @@ where
.map_err(|_| JError::new(format!("Error reading vault file `{path}`")))
}

fn subnet_resolve(&self, args: Args) -> Result<JValue, JError> {
async fn subnet_resolve(&self, args: Args) -> Result<JValue, JError> {
let mut args = args.function_args.into_iter();
let deal_id: String = Args::next("deal_id", &mut args)?;
let result = subnet_resolver::resolve_subnet(deal_id, &self.connector_api_endpoint);
let result = subnet_resolver::resolve_subnet(deal_id, &self.connector_api_endpoint).await;
Ok(json!(result))
}
}
Expand Down
Loading