From a62086ade37223eb4c79847fc09a91be2137be9c Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Fri, 6 Dec 2019 18:25:37 +0100 Subject: [PATCH 01/13] Add StatsSignal that gets send every 500ms over all admin interfaces --- crates/conductor_lib/src/conductor/base.rs | 54 +++++++++++++++++++++- crates/core/src/context.rs | 16 +++++++ crates/core/src/signal.rs | 17 +++++++ 3 files changed, 86 insertions(+), 1 deletion(-) diff --git a/crates/conductor_lib/src/conductor/base.rs b/crates/conductor_lib/src/conductor/base.rs index e73016e534..ba3d9c60a4 100644 --- a/crates/conductor_lib/src/conductor/base.rs +++ b/crates/conductor_lib/src/conductor/base.rs @@ -61,6 +61,10 @@ use holochain_net::{ p2p_config::{BackendConfig, P2pBackendKind, P2pConfig}, p2p_network::P2pNetwork, }; +use holochain_core::signal::{StatsSignal, InstanceStats}; +use std::time::Instant; + +const STATS_SIGNAL_INTERVAL: Duration = Duration::from_millis(500); lazy_static! { /// This is a global and mutable Conductor singleton. @@ -262,21 +266,23 @@ impl Conductor { self.stop_signal_multiplexer(); let broadcasters = self.interface_broadcasters.clone(); let instance_signal_receivers = self.instance_signal_receivers.clone(); + let instances = self.instances.clone(); let signal_tx = self.signal_tx.clone(); let config = self.config.clone(); let (kill_switch_tx, kill_switch_rx) = unbounded(); self.signal_multiplexer_kill_switch = Some(kill_switch_tx); + let mut last_stats_signal_instant = Instant::now(); debug!("starting signal loop"); thread::Builder::new() .name("signal_multiplexer".to_string()) .spawn(move || loop { + let broadcasters = broadcasters.read().unwrap(); { for (instance_id, receiver) in instance_signal_receivers.read().unwrap().iter() { if let Ok(signal) = receiver.try_recv() { signal_tx.clone().map(|s| s.send(signal.clone())); - let broadcasters = broadcasters.read().unwrap(); let interfaces_with_instance: Vec<&InterfaceConfiguration> = match signal { // Send internal signals only to admin interfaces, if signals.trace is set: @@ -328,6 +334,8 @@ impl Conductor { println!("INTERFACEs for SIGNAL: {:?}", interfaces); interfaces } + + Signal::Stats(_) => unreachable!(), }; for interface in interfaces_with_instance { @@ -343,6 +351,50 @@ impl Conductor { } } } + + if last_stats_signal_instant.elapsed() > STATS_SIGNAL_INTERVAL { + let admin_interfaces = config + .interfaces + .iter() + .filter(|interface_config| interface_config.admin) + .collect::>(); + + if admin_interfaces.len() > 0 { + // Get stats for all instances: + let mut instance_stats: HashMap = HashMap::new(); + for (id, instance) in instances.iter() { + if let Err(error) = instance.read() + .map_err(|_| HolochainInstanceError::InternalFailure(HolochainError::new("Could not get lock on instance"))) + .and_then(|instance| instance.context()) + .and_then(|context| context.get_stats().map_err(|e| e.into())) + .and_then(|stats| { + instance_stats.insert(id.clone(), stats); + Ok(()) + }) + { + error!("Could not get stats for instance '{}'. Error: {:?}", id, error); + } + } + + // Wrap stats in signal: + let stats_signal = Signal::Stats(StatsSignal{instance_stats}); + + // Send signal over admin interfaces: + for interface in admin_interfaces { + if let Some(broadcaster) = broadcasters.get(&interface.id) { + if let Err(error) = broadcaster.send(SignalWrapper { + signal: stats_signal.clone(), + instance_id: String::new(), + }) { + notify(error.to_string()); + } + }; + } + } + last_stats_signal_instant = Instant::now(); + } + + if kill_switch_rx.try_recv().is_ok() { break; } diff --git a/crates/core/src/context.rs b/crates/core/src/context.rs index c27b996083..0416872f71 100644 --- a/crates/core/src/context.rs +++ b/crates/core/src/context.rs @@ -47,6 +47,7 @@ use std::{ #[cfg(test)] use test_utils::mock_signing::mock_conductor_api; +use crate::signal::InstanceStats; pub struct P2pNetworkWrapper(Arc>>); @@ -389,6 +390,21 @@ impl Context { "No public CapTokenGrant entry type in chain".into(), )) } + + pub fn get_stats(&self) -> HcResult { + let state = self + .state() + .ok_or_else(|| "Couldn't get instance state".to_string())?; + let dht_store = state.dht(); + let holding_map = dht_store.get_holding_map().bare(); + Ok(InstanceStats{ + number_held_entries: holding_map.keys().count(), + number_held_aspects: holding_map.values().fold(0, |acc, aspect_set| acc + aspect_set.len()), + number_pending_validations: dht_store.queued_holding_workflows().len(), + number_running_zome_calls: state.nucleus().running_zome_calls.len(), + offline: false, + }) + } } pub async fn get_dna_and_agent(context: &Arc) -> HcResult<(Address, String)> { diff --git a/crates/core/src/signal.rs b/crates/core/src/signal.rs index cf9114e88c..009c9cc392 100644 --- a/crates/core/src/signal.rs +++ b/crates/core/src/signal.rs @@ -5,6 +5,7 @@ use holochain_wasm_utils::api_serialization::emit_signal::EmitSignalArgs; use serde::{Deserialize, Deserializer}; use snowflake::ProcessUniqueId; use std::thread; +use std::collections::HashMap; #[derive(Clone, Debug, Serialize, DefaultJson)] #[serde(tag = "signal_type")] @@ -13,6 +14,7 @@ pub enum Signal { Trace(ActionWrapper), Consistency(ConsistencySignal), User(UserSignal), + Stats(StatsSignal), } #[derive(Clone, Debug, Serialize, Deserialize, DefaultJson, PartialEq)] @@ -30,6 +32,21 @@ impl From for UserSignal { } } +#[derive(Clone, Debug, Serialize, Deserialize, DefaultJson, PartialEq)] +pub struct StatsSignal { + pub instance_stats: HashMap, +} + +#[derive(Clone, Debug, Serialize, Deserialize, DefaultJson, PartialEq)] +pub struct InstanceStats { + pub number_held_entries: usize, + pub number_held_aspects: usize, + pub number_pending_validations: usize, + pub number_running_zome_calls: usize, + pub offline: bool, +} + + impl<'de> Deserialize<'de> for Signal { fn deserialize(_deserializer: D) -> Result where From 003e956c9b397a76aa4cb40b53077e579b87731f Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Fri, 6 Dec 2019 18:25:55 +0100 Subject: [PATCH 02/13] rustfmt --- crates/conductor_lib/src/conductor/base.rs | 19 +++++++++++++------ crates/core/src/context.rs | 8 +++++--- crates/core/src/signal.rs | 4 +--- 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/crates/conductor_lib/src/conductor/base.rs b/crates/conductor_lib/src/conductor/base.rs index ba3d9c60a4..8bd5e5b639 100644 --- a/crates/conductor_lib/src/conductor/base.rs +++ b/crates/conductor_lib/src/conductor/base.rs @@ -54,6 +54,7 @@ use crate::{ static_server_impls::NickelStaticServer as StaticServer, }; use boolinator::Boolinator; +use holochain_core::signal::{InstanceStats, StatsSignal}; use holochain_core_types::dna::bridges::BridgePresence; use holochain_net::{ connection::net_connection::NetHandler, @@ -61,7 +62,6 @@ use holochain_net::{ p2p_config::{BackendConfig, P2pBackendKind, P2pConfig}, p2p_network::P2pNetwork, }; -use holochain_core::signal::{StatsSignal, InstanceStats}; use std::time::Instant; const STATS_SIGNAL_INTERVAL: Duration = Duration::from_millis(500); @@ -363,8 +363,13 @@ impl Conductor { // Get stats for all instances: let mut instance_stats: HashMap = HashMap::new(); for (id, instance) in instances.iter() { - if let Err(error) = instance.read() - .map_err(|_| HolochainInstanceError::InternalFailure(HolochainError::new("Could not get lock on instance"))) + if let Err(error) = instance + .read() + .map_err(|_| { + HolochainInstanceError::InternalFailure(HolochainError::new( + "Could not get lock on instance", + )) + }) .and_then(|instance| instance.context()) .and_then(|context| context.get_stats().map_err(|e| e.into())) .and_then(|stats| { @@ -372,12 +377,15 @@ impl Conductor { Ok(()) }) { - error!("Could not get stats for instance '{}'. Error: {:?}", id, error); + error!( + "Could not get stats for instance '{}'. Error: {:?}", + id, error + ); } } // Wrap stats in signal: - let stats_signal = Signal::Stats(StatsSignal{instance_stats}); + let stats_signal = Signal::Stats(StatsSignal { instance_stats }); // Send signal over admin interfaces: for interface in admin_interfaces { @@ -394,7 +402,6 @@ impl Conductor { last_stats_signal_instant = Instant::now(); } - if kill_switch_rx.try_recv().is_ok() { break; } diff --git a/crates/core/src/context.rs b/crates/core/src/context.rs index 0416872f71..be8a097097 100644 --- a/crates/core/src/context.rs +++ b/crates/core/src/context.rs @@ -45,9 +45,9 @@ use std::{ time::Duration, }; +use crate::signal::InstanceStats; #[cfg(test)] use test_utils::mock_signing::mock_conductor_api; -use crate::signal::InstanceStats; pub struct P2pNetworkWrapper(Arc>>); @@ -397,9 +397,11 @@ impl Context { .ok_or_else(|| "Couldn't get instance state".to_string())?; let dht_store = state.dht(); let holding_map = dht_store.get_holding_map().bare(); - Ok(InstanceStats{ + Ok(InstanceStats { number_held_entries: holding_map.keys().count(), - number_held_aspects: holding_map.values().fold(0, |acc, aspect_set| acc + aspect_set.len()), + number_held_aspects: holding_map + .values() + .fold(0, |acc, aspect_set| acc + aspect_set.len()), number_pending_validations: dht_store.queued_holding_workflows().len(), number_running_zome_calls: state.nucleus().running_zome_calls.len(), offline: false, diff --git a/crates/core/src/signal.rs b/crates/core/src/signal.rs index 009c9cc392..d19721f9dc 100644 --- a/crates/core/src/signal.rs +++ b/crates/core/src/signal.rs @@ -4,8 +4,7 @@ use holochain_json_api::{error::JsonError, json::JsonString}; use holochain_wasm_utils::api_serialization::emit_signal::EmitSignalArgs; use serde::{Deserialize, Deserializer}; use snowflake::ProcessUniqueId; -use std::thread; -use std::collections::HashMap; +use std::{collections::HashMap, thread}; #[derive(Clone, Debug, Serialize, DefaultJson)] #[serde(tag = "signal_type")] @@ -46,7 +45,6 @@ pub struct InstanceStats { pub offline: bool, } - impl<'de> Deserialize<'de> for Signal { fn deserialize(_deserializer: D) -> Result where From 608721ffb334a5b0f2cb30e9dbe76ee3a10cb187 Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Mon, 9 Dec 2019 19:20:15 +0100 Subject: [PATCH 03/13] Start signal multiplexer after instances got initialized so we have all in that thread --- crates/conductor_lib/src/conductor/base.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/conductor_lib/src/conductor/base.rs b/crates/conductor_lib/src/conductor/base.rs index 8bd5e5b639..9c1e5ac641 100644 --- a/crates/conductor_lib/src/conductor/base.rs +++ b/crates/conductor_lib/src/conductor/base.rs @@ -760,8 +760,7 @@ impl Conductor { if self.p2p_config.is_none() { self.p2p_config = Some(self.initialize_p2p_config()); } - - self.start_signal_multiplexer(); + self.dpki_bootstrap()?; for id in self.config.instance_ids_sorted_by_bridge_dependencies()? { @@ -781,6 +780,8 @@ impl Conductor { } } + self.start_signal_multiplexer(); + for ui_interface_config in self.config.ui_interfaces.clone() { notify(format!("adding ui interface {}", &ui_interface_config.id)); let bundle_config = self From 993ca00b95b980975f4ffe1a686775cf618140e9 Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Mon, 9 Dec 2019 23:53:45 +0100 Subject: [PATCH 04/13] Add parameters to conductor API call `debug/state_dump` to exlude portions that can be big --- crates/conductor_lib/src/interface.rs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/crates/conductor_lib/src/interface.rs b/crates/conductor_lib/src/interface.rs index cf8354b784..36b28489fd 100644 --- a/crates/conductor_lib/src/interface.rs +++ b/crates/conductor_lib/src/interface.rs @@ -742,6 +742,9 @@ impl ConductorApiBuilder { /// Returns a JSON object with all relevant fields of an instance's state. /// Params: /// - `instance_id` ID of the instance of which the state is requested + /// - `source_chain` [bool] (optional) If set to false, will exclude source chain headers + /// - `held_aspects` [bool] (optional) If set to false, will exclude the holding map entries + /// - `queued_holding_workflows` [bool] (optional If set to false, will exclude contents of the validatio queue /// /// - `debug/fetch_cas` /// Returns content of a given instance's CAS. @@ -762,7 +765,20 @@ impl ConductorApiBuilder { let params_map = Self::unwrap_params_map(params)?; let instance_id = Self::get_as_string("instance_id", ¶ms_map)?; - let dump = conductor_call!(|c| c.state_dump_for_instance(&instance_id))?; + + let mut dump = conductor_call!(|c| c.state_dump_for_instance(&instance_id))?; + + if Ok(false) == Self::get_as_bool("source_chain", ¶ms_map) { + dump.source_chain.clear() + } + + if Ok(false) == Self::get_as_bool("held_aspects", ¶ms_map) { + dump.held_aspects.clear() + } + + if Ok(false) == Self::get_as_bool("queued_holding_workflows", ¶ms_map) { + dump.queued_holding_workflows.clear() + } Ok(serde_json::to_value(dump) .map_err(|e| jsonrpc_core::Error::invalid_params(e.to_string()))?) From 7e42f373c703e044287967b2b66994c0c3bd1afb Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Mon, 9 Dec 2019 23:54:09 +0100 Subject: [PATCH 05/13] rustfmt --- crates/conductor_lib/src/conductor/base.rs | 2 +- crates/conductor_lib/src/interface.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/conductor_lib/src/conductor/base.rs b/crates/conductor_lib/src/conductor/base.rs index 9c1e5ac641..4efdb8209d 100644 --- a/crates/conductor_lib/src/conductor/base.rs +++ b/crates/conductor_lib/src/conductor/base.rs @@ -760,7 +760,7 @@ impl Conductor { if self.p2p_config.is_none() { self.p2p_config = Some(self.initialize_p2p_config()); } - + self.dpki_bootstrap()?; for id in self.config.instance_ids_sorted_by_bridge_dependencies()? { diff --git a/crates/conductor_lib/src/interface.rs b/crates/conductor_lib/src/interface.rs index 36b28489fd..6a3eb10cc0 100644 --- a/crates/conductor_lib/src/interface.rs +++ b/crates/conductor_lib/src/interface.rs @@ -765,7 +765,6 @@ impl ConductorApiBuilder { let params_map = Self::unwrap_params_map(params)?; let instance_id = Self::get_as_string("instance_id", ¶ms_map)?; - let mut dump = conductor_call!(|c| c.state_dump_for_instance(&instance_id))?; if Ok(false) == Self::get_as_bool("source_chain", ¶ms_map) { From e56dbec1ea40f31f68c3d38971249aadad5cebe3 Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Tue, 10 Dec 2019 16:49:49 +0100 Subject: [PATCH 06/13] Update crates/conductor_lib/src/interface.rs Co-Authored-By: Eric Harris-Braun --- crates/conductor_lib/src/interface.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/conductor_lib/src/interface.rs b/crates/conductor_lib/src/interface.rs index d9d991cd8b..57a03890a7 100644 --- a/crates/conductor_lib/src/interface.rs +++ b/crates/conductor_lib/src/interface.rs @@ -745,7 +745,7 @@ impl ConductorApiBuilder { /// - `instance_id` ID of the instance of which the state is requested /// - `source_chain` [bool] (optional) If set to false, will exclude source chain headers /// - `held_aspects` [bool] (optional) If set to false, will exclude the holding map entries - /// - `queued_holding_workflows` [bool] (optional If set to false, will exclude contents of the validatio queue + /// - `queued_holding_workflows` [bool] (optional If set to false, will exclude contents of the validation queue /// /// - `debug/fetch_cas` /// Returns content of a given instance's CAS. From 3a85fcfd73592c5029545879b18dd564b2602be3 Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Wed, 11 Dec 2019 18:12:10 +0100 Subject: [PATCH 07/13] Update crates/conductor_lib/src/conductor/base.rs Co-Authored-By: Purple Hair Rust Bard --- crates/conductor_lib/src/conductor/base.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/conductor_lib/src/conductor/base.rs b/crates/conductor_lib/src/conductor/base.rs index ec0b334186..9b29fbad0d 100644 --- a/crates/conductor_lib/src/conductor/base.rs +++ b/crates/conductor_lib/src/conductor/base.rs @@ -372,7 +372,7 @@ impl Conductor { .filter(|interface_config| interface_config.admin) .collect::>(); - if admin_interfaces.len() > 0 { + if !admin_interfaces.is_empty() { // Get stats for all instances: let mut instance_stats: HashMap = HashMap::new(); for (id, instance) in instances.iter() { From 6e168a04a6da6509e95633f5b08a62e3d44e526f Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Wed, 11 Dec 2019 19:34:12 +0100 Subject: [PATCH 08/13] Replace unreachable! with panic with message --- crates/conductor_lib/src/conductor/base.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/conductor_lib/src/conductor/base.rs b/crates/conductor_lib/src/conductor/base.rs index 9b29fbad0d..344cb6cfbe 100644 --- a/crates/conductor_lib/src/conductor/base.rs +++ b/crates/conductor_lib/src/conductor/base.rs @@ -348,7 +348,7 @@ impl Conductor { interfaces } - Signal::Stats(_) => unreachable!(), + Signal::Stats(_) => panic!("Signal::Stats is a special case that should not get emitted from instances but gets created in the conductor to send stats for all instances in a single signal."), }; for interface in interfaces_with_instance { From d934731dcbb9fcaca05f474cea38e5d2fb56c16a Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Wed, 11 Dec 2019 21:47:12 +0100 Subject: [PATCH 09/13] Pull stats signal processing into its own thread --- crates/conductor_lib/src/conductor/base.rs | 137 ++++++++++++--------- 1 file changed, 80 insertions(+), 57 deletions(-) diff --git a/crates/conductor_lib/src/conductor/base.rs b/crates/conductor_lib/src/conductor/base.rs index 344cb6cfbe..fea90bd64d 100644 --- a/crates/conductor_lib/src/conductor/base.rs +++ b/crates/conductor_lib/src/conductor/base.rs @@ -63,9 +63,6 @@ use holochain_net::{ p2p_config::{BackendConfig, P2pBackendKind, P2pConfig}, p2p_network::P2pNetwork, }; -use std::time::Instant; - -const STATS_SIGNAL_INTERVAL: Duration = Duration::from_millis(500); pub const MAX_DYNAMIC_PORT: u16 = std::u16::MAX; @@ -121,6 +118,8 @@ pub struct Conductor { pub(in crate::conductor) interface_threads: HashMap>, pub(in crate::conductor) interface_broadcasters: Arc>>, signal_multiplexer_kill_switch: Option>, + stats_thread_kill_switch: Option>, + stats_signal_receiver: Option>, pub key_loader: KeyLoader, pub(in crate::conductor) dna_loader: DnaLoader, pub(in crate::conductor) ui_dir_copier: UiDirCopier, @@ -233,6 +232,8 @@ impl Conductor { static_servers: HashMap::new(), interface_broadcasters: Arc::new(RwLock::new(HashMap::new())), signal_multiplexer_kill_switch: None, + stats_thread_kill_switch: None, + stats_signal_receiver: None, config, key_loader: Arc::new(Box::new(Self::load_key)), dna_loader: Arc::new(Box::new(Self::load_dna)), @@ -247,6 +248,55 @@ impl Conductor { } } + pub fn spawn_stats_thread(&mut self) { + self.stop_stats_thread(); + let instances = self.instances.clone(); + let (kill_switch_tx, kill_switch_rx) = unbounded(); + let (stats_tx, stats_rx) = unbounded(); + self.stats_thread_kill_switch = Some(kill_switch_tx); + self.stats_signal_receiver = Some(stats_rx); + thread::Builder::new() + .name("stats".to_string()) + .spawn(move || loop { + // Get stats for all instances: + let mut instance_stats: HashMap = HashMap::new(); + for (id, instance) in instances.iter() { + if let Err(error) = instance + .read() + .map_err(|_| { + HolochainInstanceError::InternalFailure(HolochainError::new( + "Could not get lock on instance", + )) + }) + .and_then(|instance| instance.context()) + .and_then(|context| context.get_stats().map_err(|e| e.into())) + .and_then(|stats| { + instance_stats.insert(id.clone(), stats); + Ok(()) + }) + { + error!( + "Could not get stats for instance '{}'. Error: {:?}", + id, error + ); + } + } + + // Wrap stats in signal: + let stats_signal = Signal::Stats(StatsSignal { instance_stats }); + + if let Err(e) = stats_tx.send(stats_signal) { + error!("Could not send stats signal over channel: {:?}", e); + } + + if kill_switch_rx.try_recv().is_ok() { + break; + } + thread::sleep(Duration::from_millis(500)); + }) + .expect(""); + } + pub fn add_agent_keystore(&mut self, agent_id: String, keystore: Keystore) { self.agent_keys .insert(agent_id, Arc::new(Mutex::new(keystore))); @@ -279,18 +329,25 @@ impl Conductor { self.stop_signal_multiplexer(); let broadcasters = self.interface_broadcasters.clone(); let instance_signal_receivers = self.instance_signal_receivers.clone(); - let instances = self.instances.clone(); let signal_tx = self.signal_tx.clone(); let config = self.config.clone(); let (kill_switch_tx, kill_switch_rx) = unbounded(); self.signal_multiplexer_kill_switch = Some(kill_switch_tx); - let mut last_stats_signal_instant = Instant::now(); + self.spawn_stats_thread(); + let stats_signal_receiver = self.stats_signal_receiver.clone().expect( + "Receiver must be Some after calling spawn_stats_thread() above which should set it", + ); debug!("starting signal loop"); thread::Builder::new() .name("signal_multiplexer".to_string()) .spawn(move || loop { let broadcasters = broadcasters.read().unwrap(); + let admin_interfaces = config + .interfaces + .iter() + .filter(|interface_config| interface_config.admin) + .collect::>(); { for (instance_id, receiver) in instance_signal_receivers.read().unwrap().iter() { @@ -301,11 +358,7 @@ impl Conductor { // Send internal signals only to admin interfaces, if signals.trace is set: Signal::Trace(_) => { if config.signals.trace { - config - .interfaces - .iter() - .filter(|interface_config| interface_config.admin) - .collect() + admin_interfaces.clone() } else { Vec::new() } @@ -365,64 +418,34 @@ impl Conductor { } } - if last_stats_signal_instant.elapsed() > STATS_SIGNAL_INTERVAL { - let admin_interfaces = config - .interfaces - .iter() - .filter(|interface_config| interface_config.admin) - .collect::>(); - - if !admin_interfaces.is_empty() { - // Get stats for all instances: - let mut instance_stats: HashMap = HashMap::new(); - for (id, instance) in instances.iter() { - if let Err(error) = instance - .read() - .map_err(|_| { - HolochainInstanceError::InternalFailure(HolochainError::new( - "Could not get lock on instance", - )) - }) - .and_then(|instance| instance.context()) - .and_then(|context| context.get_stats().map_err(|e| e.into())) - .and_then(|stats| { - instance_stats.insert(id.clone(), stats); - Ok(()) - }) - { - error!( - "Could not get stats for instance '{}'. Error: {:?}", - id, error - ); + // Process stats signals and send them over admin interfaces: + while let Ok(stats_signal) = stats_signal_receiver.try_recv() { + for interface in &admin_interfaces { + if let Some(broadcaster) = broadcasters.get(&interface.id) { + if let Err(error) = broadcaster.send(SignalWrapper { + signal: stats_signal.clone(), + instance_id: String::new(), + }) { + notify(error.to_string()); } - } - - // Wrap stats in signal: - let stats_signal = Signal::Stats(StatsSignal { instance_stats }); - - // Send signal over admin interfaces: - for interface in admin_interfaces { - if let Some(broadcaster) = broadcasters.get(&interface.id) { - if let Err(error) = broadcaster.send(SignalWrapper { - signal: stats_signal.clone(), - instance_id: String::new(), - }) { - notify(error.to_string()); - } - }; - } + }; } - last_stats_signal_instant = Instant::now(); } if kill_switch_rx.try_recv().is_ok() { break; } - thread::sleep(Duration::from_millis(1)); + thread::yield_now(); }) .expect("Must be able to spawn thread") } + pub fn stop_stats_thread(&self) { + self.stats_thread_kill_switch + .as_ref() + .map(|kill_switch| kill_switch.send(())); + } + pub fn stop_signal_multiplexer(&self) { self.signal_multiplexer_kill_switch .as_ref() From d974339b47e33616f7712e7d2bc1641f778397d1 Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Wed, 11 Dec 2019 22:00:38 +0100 Subject: [PATCH 10/13] Make SignalWrapper an enum and represent InstanceStats there. Move run-time panic check to compile-time type power. --- crates/conductor_lib/src/conductor/base.rs | 28 ++++++++++------------ crates/conductor_lib/src/signal_wrapper.rs | 17 +++++++++---- crates/core/src/context.rs | 11 ++++++++- crates/core/src/signal.rs | 17 +------------ 4 files changed, 35 insertions(+), 38 deletions(-) diff --git a/crates/conductor_lib/src/conductor/base.rs b/crates/conductor_lib/src/conductor/base.rs index fea90bd64d..ece84a98e0 100644 --- a/crates/conductor_lib/src/conductor/base.rs +++ b/crates/conductor_lib/src/conductor/base.rs @@ -55,7 +55,7 @@ use crate::{ static_server_impls::NickelStaticServer as StaticServer, }; use boolinator::Boolinator; -use holochain_core::signal::{InstanceStats, StatsSignal}; +use holochain_core::context::InstanceStats; use holochain_core_types::dna::bridges::BridgePresence; use holochain_net::{ connection::net_connection::NetHandler, @@ -119,7 +119,7 @@ pub struct Conductor { pub(in crate::conductor) interface_broadcasters: Arc>>, signal_multiplexer_kill_switch: Option>, stats_thread_kill_switch: Option>, - stats_signal_receiver: Option>, + stats_signal_receiver: Option>>, pub key_loader: KeyLoader, pub(in crate::conductor) dna_loader: DnaLoader, pub(in crate::conductor) ui_dir_copier: UiDirCopier, @@ -282,10 +282,7 @@ impl Conductor { } } - // Wrap stats in signal: - let stats_signal = Signal::Stats(StatsSignal { instance_stats }); - - if let Err(e) = stats_tx.send(stats_signal) { + if let Err(e) = stats_tx.send(instance_stats) { error!("Could not send stats signal over channel: {:?}", e); } @@ -400,16 +397,16 @@ impl Conductor { println!("INTERFACEs for SIGNAL: {:?}", interfaces); interfaces } - - Signal::Stats(_) => panic!("Signal::Stats is a special case that should not get emitted from instances but gets created in the conductor to send stats for all instances in a single signal."), }; for interface in interfaces_with_instance { if let Some(broadcaster) = broadcasters.get(&interface.id) { - if let Err(error) = broadcaster.send(SignalWrapper { - signal: signal.clone(), - instance_id: instance_id.clone(), - }) { + if let Err(error) = + broadcaster.send(SignalWrapper::InstanceSignal { + signal: signal.clone(), + instance_id: instance_id.clone(), + }) + { notify(error.to_string()); } }; @@ -419,12 +416,11 @@ impl Conductor { } // Process stats signals and send them over admin interfaces: - while let Ok(stats_signal) = stats_signal_receiver.try_recv() { + while let Ok(instance_stats) = stats_signal_receiver.try_recv() { for interface in &admin_interfaces { if let Some(broadcaster) = broadcasters.get(&interface.id) { - if let Err(error) = broadcaster.send(SignalWrapper { - signal: stats_signal.clone(), - instance_id: String::new(), + if let Err(error) = broadcaster.send(SignalWrapper::InstanceStats { + instance_stats: instance_stats.clone(), }) { notify(error.to_string()); } diff --git a/crates/conductor_lib/src/signal_wrapper.rs b/crates/conductor_lib/src/signal_wrapper.rs index b5623e342b..e4ab5ac007 100644 --- a/crates/conductor_lib/src/signal_wrapper.rs +++ b/crates/conductor_lib/src/signal_wrapper.rs @@ -1,10 +1,17 @@ -use holochain_core::signal::Signal; +use holochain_core::{context::InstanceStats, signal::Signal}; use holochain_json_api::{error::JsonError, json::JsonString}; +use std::collections::HashMap; -/// This struct wraps a Signal from core before serializing and sending over +/// This enum wraps a Signal from core before serializing and sending over /// an interface to the UI or other client. #[derive(Serialize, Deserialize, Debug, DefaultJson)] -pub struct SignalWrapper { - pub signal: Signal, - pub instance_id: String, +#[allow(clippy::large_enum_variant)] +pub enum SignalWrapper { + InstanceSignal { + signal: Signal, + instance_id: String, + }, + InstanceStats { + instance_stats: HashMap, + }, } diff --git a/crates/core/src/context.rs b/crates/core/src/context.rs index be8a097097..1533f56756 100644 --- a/crates/core/src/context.rs +++ b/crates/core/src/context.rs @@ -25,6 +25,7 @@ use holochain_core_types::{ }, error::{HcResult, HolochainError}, }; +use holochain_json_api::{error::JsonError, json::JsonString}; use holochain_locksmith::{Mutex, MutexGuard, RwLock, RwLockReadGuard}; use holochain_metrics::MetricPublisher; use holochain_net::{p2p_config::P2pConfig, p2p_network::P2pNetwork}; @@ -45,7 +46,6 @@ use std::{ time::Duration, }; -use crate::signal::InstanceStats; #[cfg(test)] use test_utils::mock_signing::mock_conductor_api; @@ -68,6 +68,15 @@ impl<'a> P2pNetworkMutexGuardWrapper<'a> { } } +#[derive(Clone, Debug, Serialize, Deserialize, DefaultJson, PartialEq)] +pub struct InstanceStats { + pub number_held_entries: usize, + pub number_held_aspects: usize, + pub number_pending_validations: usize, + pub number_running_zome_calls: usize, + pub offline: bool, +} + /// Context holds the components that parts of a Holochain instance need in order to operate. /// This includes components that are injected from the outside like persister /// but also the store of the instance that gets injected before passing on the context diff --git a/crates/core/src/signal.rs b/crates/core/src/signal.rs index d19721f9dc..cf9114e88c 100644 --- a/crates/core/src/signal.rs +++ b/crates/core/src/signal.rs @@ -4,7 +4,7 @@ use holochain_json_api::{error::JsonError, json::JsonString}; use holochain_wasm_utils::api_serialization::emit_signal::EmitSignalArgs; use serde::{Deserialize, Deserializer}; use snowflake::ProcessUniqueId; -use std::{collections::HashMap, thread}; +use std::thread; #[derive(Clone, Debug, Serialize, DefaultJson)] #[serde(tag = "signal_type")] @@ -13,7 +13,6 @@ pub enum Signal { Trace(ActionWrapper), Consistency(ConsistencySignal), User(UserSignal), - Stats(StatsSignal), } #[derive(Clone, Debug, Serialize, Deserialize, DefaultJson, PartialEq)] @@ -31,20 +30,6 @@ impl From for UserSignal { } } -#[derive(Clone, Debug, Serialize, Deserialize, DefaultJson, PartialEq)] -pub struct StatsSignal { - pub instance_stats: HashMap, -} - -#[derive(Clone, Debug, Serialize, Deserialize, DefaultJson, PartialEq)] -pub struct InstanceStats { - pub number_held_entries: usize, - pub number_held_aspects: usize, - pub number_pending_validations: usize, - pub number_running_zome_calls: usize, - pub offline: bool, -} - impl<'de> Deserialize<'de> for Signal { fn deserialize(_deserializer: D) -> Result where From a29afcd95c776001346cdac5b2394d0c86d47f83 Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Wed, 11 Dec 2019 22:16:58 +0100 Subject: [PATCH 11/13] serde(tag="type") to make SignalWrapper's JSON representation backwards compatible --- crates/conductor_lib/src/signal_wrapper.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/conductor_lib/src/signal_wrapper.rs b/crates/conductor_lib/src/signal_wrapper.rs index e4ab5ac007..0988157c50 100644 --- a/crates/conductor_lib/src/signal_wrapper.rs +++ b/crates/conductor_lib/src/signal_wrapper.rs @@ -6,6 +6,7 @@ use std::collections::HashMap; /// an interface to the UI or other client. #[derive(Serialize, Deserialize, Debug, DefaultJson)] #[allow(clippy::large_enum_variant)] +#[serde(tag = "type")] pub enum SignalWrapper { InstanceSignal { signal: Signal, From 68c64d451a9dc91ce310278ee11942ed7f03dc0d Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Wed, 11 Dec 2019 22:22:06 +0100 Subject: [PATCH 12/13] changelog --- CHANGELOG-UNRELEASED.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG-UNRELEASED.md b/CHANGELOG-UNRELEASED.md index 3e2185e437..d3c1c591d0 100644 --- a/CHANGELOG-UNRELEASED.md +++ b/CHANGELOG-UNRELEASED.md @@ -8,7 +8,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Adds support for [hApp-bundles](https://github.com/holochain/holoscape/tree/master/example-bundles) to `hc run`. This enables complex hApp setups with multiple DNAs and bridges to be easily run during development without having to write/maintain a conductor config file. [#1939](https://github.com/holochain/holochain-rust/pull/1939) - Adds ability to validate entries with full chain validation when author is offline [#1932](https://github.com/holochain/holochain-rust/pull/1932) - +- Adds a conductor level stats-signal that sends an overview of instance data (number of held entries etc.) over admin interfaces. [#1954](https://github.com/holochain/holochain-rust/pull/1954) +- Adds parameters to conductor RPC function `debug/state_dump` to select portions of the state to be send instead of always receiving the full dump (which can get big if the instance holds many entries). [#1954](https://github.com/holochain/holochain-rust/pull/1954) ### Changed ### Deprecated From 6eb28526fc40e8cb4aca41bf5993f3bfe0a10247 Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Wed, 11 Dec 2019 22:23:40 +0100 Subject: [PATCH 13/13] Revert thread::yield_now() back to sleep() to not eat up all free CPU time --- crates/conductor_lib/src/conductor/base.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/conductor_lib/src/conductor/base.rs b/crates/conductor_lib/src/conductor/base.rs index ece84a98e0..124282d37e 100644 --- a/crates/conductor_lib/src/conductor/base.rs +++ b/crates/conductor_lib/src/conductor/base.rs @@ -431,7 +431,7 @@ impl Conductor { if kill_switch_rx.try_recv().is_ok() { break; } - thread::yield_now(); + thread::sleep(Duration::from_millis(1)); }) .expect("Must be able to spawn thread") }