Skip to content

Commit

Permalink
feat(config): Introduce ephemeral and persistent storage [fixes NET-759
Browse files Browse the repository at this point in the history
NET-760] (#2091)

* feat(cores): cpu_range set

* feat(cores): core manager

* small improvements

* small improvements

* small improvements

* small improvements

* small improvements

* small improvements

* small improvements

* small improvements

* pin system threads

* pin worker threads

* Fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* use CUID

* feat(config): Introduce ephemeral and persistent storage

* fix

* fix

* fix

* change layout + inject persistent dirs into services

* fix test

* Expose applied config in CreatedSwarm

* rename stepper

* fix

* fix fd in core manager

* fix fd in core manager

* fix fd in core manager

* update spell

* update spell

* shutdown workers runtime at shutdown signal

* fix

* fluence-spell-dtos update

* fluence-spell-dtos update

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* update fluence-spell-dtos

* make injecttion async

* fix(deps): decider 0.6.7

* fixes

---------

Co-authored-by: folex <0xdxdy@gmail.com>
  • Loading branch information
gurinderu and folex committed Feb 23, 2024
1 parent 938f751 commit 79e7850
Show file tree
Hide file tree
Showing 29 changed files with 815 additions and 362 deletions.
230 changes: 152 additions & 78 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@ types = { path = "crates/types" }
core-manager = { path = "crates/core-manager" }

# spell
fluence-spell-dtos = "=0.7.3"
fluence-spell-distro = "=0.7.3"
fluence-spell-dtos = "=0.7.4"
fluence-spell-distro = "=0.7.4"

# marine
fluence-app-service = { version = "0.33.0" }
fluence-app-service = "0.35.0"
marine-utils = "0.5.1"
marine-it-parser = "0.15.1"
marine-it-parser = "0.16.0"

# avm
avm-server = "=0.35.0"
Expand Down
8 changes: 2 additions & 6 deletions crates/core-manager/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,12 +429,8 @@ impl CoreManagerFunctions for PersistentCoreManager {
drop(lock);
let toml = toml::to_string_pretty(&persistent_state)
.map_err(|err| PersistError::SerializationError { err })?;
let exists = self.file_path.exists();
let mut file = if exists {
File::open(self.file_path.clone()).map_err(|err| PersistError::IoError { err })?
} else {
File::create(self.file_path.clone()).map_err(|err| PersistError::IoError { err })?
};
let mut file =
File::create(self.file_path.clone()).map_err(|err| PersistError::IoError { err })?;
file.write(toml.as_bytes())
.map_err(|err| PersistError::IoError { err })?;
Ok(())
Expand Down
53 changes: 42 additions & 11 deletions crates/created-swarm/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ use futures::stream::iter;
use nox::{Connectivity, Node};
use particle_protocol::ProtocolConfig;
use server_config::{
system_services_config, BootstrapConfig, ChainListenerConfig, UnresolvedConfig,
persistent_dir, system_services_config, BootstrapConfig, ChainListenerConfig, ResolvedConfig,
UnresolvedConfig,
};
use tempfile::TempDir;
use test_constants::{EXECUTION_TIMEOUT, TRANSPORT_TIMEOUT};
Expand All @@ -52,12 +53,14 @@ const HEALTH_CHECK_POLLING_INTERVAL: Duration = Duration::from_millis(100);

// default bound on the number of computations it can perform simultaneously
const DEFAULT_PARALLELISM: usize = 2;

#[allow(clippy::upper_case_acronyms)]
type AVM = aquamarine::AVMRunner;

#[derive(Derivative)]
#[derivative(Debug)]
pub struct CreatedSwarm {
pub config: ResolvedConfig,
pub peer_id: PeerId,
pub multiaddr: Multiaddr,
// tmp dir, must be cleaned
Expand Down Expand Up @@ -157,7 +160,17 @@ where
F: (FnMut(
Vec<Multiaddr>,
Multiaddr,
) -> BoxFuture<'static, (PeerId, Box<Node<RT>>, KeyPair, SwarmConfig, Span)>)
) -> BoxFuture<
'static,
(
PeerId,
Box<Node<RT>>,
KeyPair,
SwarmConfig,
ResolvedConfig,
Span,
),
>)
+ 'static
+ Send,
M: (FnMut() -> Multiaddr) + 'static + Send,
Expand All @@ -175,7 +188,8 @@ where
let bootstraps = bootstraps(addrs);
let create_node_future = create_node(bootstraps, addr.clone());
async move {
let (peer_id, node, management_keypair, config, span) = create_node_future.await;
let (peer_id, node, management_keypair, input_config, resolved_config, span) =
create_node_future.await;
let connectivity = node.connectivity.clone();
let aquamarine_api = node.aquamarine_api.clone();
let started_node = node
Expand All @@ -187,9 +201,10 @@ where
.http_listen_addr
.expect("could not take http listen addr");
CreatedSwarm {
config: resolved_config,
peer_id,
multiaddr: config.listen_on,
tmp_dir: config.tmp_dir.clone(),
multiaddr: input_config.listen_on,
tmp_dir: input_config.tmp_dir.clone(),
management_keypair,
exit_outlet: started_node.exit_outlet,
connectivity,
Expand Down Expand Up @@ -306,7 +321,8 @@ pub fn aqua_vm_config(
peer_id, tmp_dir, ..
} = vm_config;

let air_interpreter = air_interpreter_path(&tmp_dir);
let persistent_dir = persistent_dir(&tmp_dir);
let air_interpreter = air_interpreter_path(&persistent_dir);
write_default_air_interpreter(&air_interpreter).expect("write air interpreter");

VmConfig::new(peer_id, air_interpreter, None)
Expand All @@ -315,7 +331,14 @@ pub fn aqua_vm_config(
pub async fn create_swarm_with_runtime<RT: AquaRuntime>(
config: SwarmConfig,
vm_config: impl Fn(BaseVmConfig) -> RT::Config,
) -> (PeerId, Box<Node<RT>>, KeyPair, SwarmConfig, Span) {
) -> (
PeerId,
Box<Node<RT>>,
KeyPair,
SwarmConfig,
ResolvedConfig,
Span,
) {
use serde_json::json;

let format = match &config.keypair {
Expand All @@ -332,7 +355,7 @@ pub async fn create_swarm_with_runtime<RT: AquaRuntime>(
let node_listen_span = tracing::info_span!(parent: &parent_span, "config");
let node_creation_span = tracing::info_span!(parent: &parent_span, "config");

let (node, management_kp) = config_apply_span.in_scope(||{
let (node, management_kp, resolved_config) = config_apply_span.in_scope(|| {
let tmp_dir = config.tmp_dir.path().to_path_buf();

let node_config = json!({
Expand Down Expand Up @@ -419,15 +442,15 @@ pub async fn create_swarm_with_runtime<RT: AquaRuntime>(
.extend(config.extend_system_services.clone());
let core_manager = Arc::new(DummyCoreManager::default().into());
let node = Node::new(
resolved,
resolved.clone(),
core_manager,
vm_config,
data_store_config,
"some version",
"some version",
system_service_distros,
);
(node, management_kp)
(node, management_kp, resolved)
});

let mut node = node
Expand All @@ -442,13 +465,21 @@ pub async fn create_swarm_with_runtime<RT: AquaRuntime>(
node,
management_kp,
config,
resolved_config,
parent_span.clone(),
)
})
}

pub async fn create_swarm(
config: SwarmConfig,
) -> (PeerId, Box<Node<AVMRunner>>, KeyPair, SwarmConfig, Span) {
) -> (
PeerId,
Box<Node<AVMRunner>>,
KeyPair,
SwarmConfig,
ResolvedConfig,
Span,
) {
create_swarm_with_runtime(config, aqua_vm_config).await
}
Loading

0 comments on commit 79e7850

Please sign in to comment.