diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 8ba8c89b545..fde82cd7fba 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -72,7 +72,6 @@ pub struct ClientBuilder { http_metrics_config: http_metrics::Config, slasher: Option>>, eth_spec_instance: T::EthSpec, - } impl diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 06ea2d2ad46..475ae782e4f 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -35,6 +35,7 @@ use eth2::types::{ }; use lighthouse_network::{types::SyncState, EnrExt, NetworkGlobals, PeerId, PubsubMessage}; use lighthouse_version::version_with_platform; +use logging::SSELoggingComponents; use network::{NetworkMessage, NetworkSenders, ValidatorSubscriptionMessage}; use operation_pool::ReceivedPreCapella; use parking_lot::RwLock; @@ -73,7 +74,6 @@ use warp_utils::{ query::multi_key_query, task::{blocking_json_task, blocking_task}, }; -use logging::SSELoggingComponents; const API_PREFIX: &str = "eth"; @@ -450,7 +450,6 @@ pub fn serve( let inner_components = ctx.sse_logging_components.clone(); let sse_component_filter = warp::any().map(move || inner_components.clone()); - // Create a `warp` filter that provides access to local system information. let system_info = Arc::new(RwLock::new(sysinfo::System::new())); { @@ -3566,52 +3565,48 @@ pub fn serve( }, ); - // Subscribe to logs via Server Side Events // /lighthouse/logs - let lighthouse_log_events = warp::path("lighthouse") + let lighthouse_log_events = warp::path("lighthouse") .and(warp::path("logs")) .and(warp::path::end()) .and(sse_component_filter) - .and_then( - |sse_component: Option| { - blocking_task(move || { - - if let Some(logging_components) = sse_component { + .and_then(|sse_component: Option| { + blocking_task(move || { + if let Some(logging_components) = sse_component { // Build a JSON stream - let s = BroadcastStream::new(logging_components.sender.subscribe()).map(|msg| { - match msg { - Ok(data) => { - // Serialize to json - match data.to_json_string() { - // Send the json as a Server Sent Event - Ok(json) => Event::default() - .json_data(json) - .map_err(|e| { - warp_utils::reject::server_sent_event_error(format!("{:?}", e)) + let s = + BroadcastStream::new(logging_components.sender.subscribe()).map(|msg| { + match msg { + Ok(data) => { + // Serialize to json + match data.to_json_string() { + // Send the json as a Server Sent Event + Ok(json) => Event::default().json_data(json).map_err(|e| { + warp_utils::reject::server_sent_event_error(format!( + "{:?}", + e + )) }), - Err(e) => Err(warp_utils::reject::server_sent_event_error( - format!("Unable to serialize to JSON {}",e), - )) + Err(e) => Err(warp_utils::reject::server_sent_event_error( + format!("Unable to serialize to JSON {}", e), + )), + } } + Err(e) => Err(warp_utils::reject::server_sent_event_error( + format!("Unable to serialize to JSON {}", e), + )), } - Err(e) => Err(warp_utils::reject::server_sent_event_error( - format!("Unable to serialize to JSON {}",e), - )) - } - }); + }); Ok::<_, warp::Rejection>(warp::sse::reply(warp::sse::keep_alive().stream(s))) - } else { - return Err(warp_utils::reject::custom_server_error( - "SSE Logging is not enabled".to_string(), - )); - } - }) - - }, - ); - + } else { + return Err(warp_utils::reject::custom_server_error( + "SSE Logging is not enabled".to_string(), + )); + } + }) + }); // Define the ultimate set of routes that will be provided to the server. let routes = warp::get() diff --git a/lighthouse/environment/src/lib.rs b/lighthouse/environment/src/lib.rs index a1579dda795..53915b52d96 100644 --- a/lighthouse/environment/src/lib.rs +++ b/lighthouse/environment/src/lib.rs @@ -12,6 +12,7 @@ use eth2_network_config::Eth2NetworkConfig; use futures::channel::mpsc::{channel, Receiver, Sender}; use futures::{future, StreamExt}; +use logging::SSELoggingComponents; use serde_derive::{Deserialize, Serialize}; use slog::{error, info, o, warn, Drain, Duplicate, Level, Logger}; use sloggers::{file::FileLoggerBuilder, types::Format, types::Severity, Build}; @@ -19,7 +20,6 @@ use std::fs::create_dir_all; use std::io::{Result as IOResult, Write}; use std::path::PathBuf; use std::sync::Arc; -use logging::SSELoggingComponents; use task_executor::{ShutdownReason, TaskExecutor}; use tokio::runtime::{Builder as RuntimeBuilder, Runtime}; use types::{EthSpec, GnosisEthSpec, MainnetEthSpec, MinimalEthSpec}; @@ -311,7 +311,6 @@ impl EnvironmentBuilder { .build() .map_err(|e| format!("Unable to build file logger: {}", e))?; - let mut log = Logger::root(Duplicate::new(stdout_logger, file_logger).fuse(), o!()); info!( @@ -322,7 +321,7 @@ impl EnvironmentBuilder { // If the http API is enabled, we may need to send logs to be consumed by subscribers. if config.sse_logging { - let sse_logger = SSELoggingComponents::new(SSE_LOG_CHANNEL_SIZE); + let sse_logger = SSELoggingComponents::new(SSE_LOG_CHANNEL_SIZE); self.sse_logging_components = Some(sse_logger.clone()); log = Logger::root(Duplicate::new(log, sse_logger).fuse(), o!()); @@ -378,7 +377,6 @@ impl EnvironmentBuilder { } } - /// An environment where Lighthouse services can run. Used to start a production beacon node or /// validator client, or to run tests that involve logging and async task execution. pub struct Environment { diff --git a/lighthouse/src/main.rs b/lighthouse/src/main.rs index 7906263dbb4..092ae431017 100644 --- a/lighthouse/src/main.rs +++ b/lighthouse/src/main.rs @@ -474,8 +474,8 @@ fn run( let sse_logging = { if let Some(bn_matches) = matches.subcommand_matches("beacon_node") { bn_matches.is_present("http") || bn_matches.is_present("gui") - } else if let Some(vc_matches) = matches.subcommand_matches("validator_client") { - vc_matches.is_present("http") + } else if let Some(vc_matches) = matches.subcommand_matches("validator_client") { + vc_matches.is_present("http") } else { false } diff --git a/validator_client/src/http_api/mod.rs b/validator_client/src/http_api/mod.rs index e3962e510b3..76849a9f2b8 100644 --- a/validator_client/src/http_api/mod.rs +++ b/validator_client/src/http_api/mod.rs @@ -16,6 +16,7 @@ use eth2::lighthouse_vc::{ types::{self as api_types, GenericResponse, Graffiti, PublicKey, PublicKeyBytes}, }; use lighthouse_version::version_with_platform; +use logging::SSELoggingComponents; use parking_lot::RwLock; use serde::{Deserialize, Serialize}; use slog::{crit, info, warn, Logger}; @@ -29,18 +30,17 @@ use std::sync::Arc; use sysinfo::{System, SystemExt}; use system_health::observe_system_health_vc; use task_executor::TaskExecutor; +use tokio_stream::{wrappers::BroadcastStream, StreamExt}; use types::{ChainSpec, ConfigAndPreset, EthSpec}; use validator_dir::Builder as ValidatorDirBuilder; -use logging::SSELoggingComponents; -use tokio_stream::{wrappers::BroadcastStream, StreamExt}; use warp::{ http::{ header::{HeaderValue, CONTENT_TYPE}, response::Response, StatusCode, }, - Filter, sse::Event, + Filter, }; #[derive(Debug)] @@ -198,7 +198,7 @@ pub fn serve( let api_token_path_inner = api_token_path.clone(); let api_token_path_filter = warp::any().map(move || api_token_path_inner.clone()); - + // Filter for SEE Logging events let inner_components = ctx.sse_logging_components.clone(); let sse_component_filter = warp::any().map(move || inner_components.clone()); @@ -983,51 +983,48 @@ pub fn serve( }) }); - // Subscribe to get VC logs via Server side events // /lighthouse/logs - let get_log_events = warp::path("lighthouse") + let get_log_events = warp::path("lighthouse") .and(warp::path("logs")) .and(warp::path::end()) .and(sse_component_filter) - .and_then( - |sse_component: Option| { - warp_utils::task::blocking_task(move || { - - if let Some(logging_components) = sse_component { + .and_then(|sse_component: Option| { + warp_utils::task::blocking_task(move || { + if let Some(logging_components) = sse_component { // Build a JSON stream - let s = BroadcastStream::new(logging_components.sender.subscribe()).map(|msg| { - match msg { - Ok(data) => { - // Serialize to json - match data.to_json_string() { - // Send the json as a Server Sent Event - Ok(json) => Event::default() - .json_data(json) - .map_err(|e| { - warp_utils::reject::server_sent_event_error(format!("{:?}", e)) + let s = + BroadcastStream::new(logging_components.sender.subscribe()).map(|msg| { + match msg { + Ok(data) => { + // Serialize to json + match data.to_json_string() { + // Send the json as a Server Sent Event + Ok(json) => Event::default().json_data(json).map_err(|e| { + warp_utils::reject::server_sent_event_error(format!( + "{:?}", + e + )) }), - Err(e) => Err(warp_utils::reject::server_sent_event_error( - format!("Unable to serialize to JSON {}",e), - )) + Err(e) => Err(warp_utils::reject::server_sent_event_error( + format!("Unable to serialize to JSON {}", e), + )), + } } + Err(e) => Err(warp_utils::reject::server_sent_event_error( + format!("Unable to serialize to JSON {}", e), + )), } - Err(e) => Err(warp_utils::reject::server_sent_event_error( - format!("Unable to serialize to JSON {}",e), - )) - } - }); + }); Ok::<_, warp::Rejection>(warp::sse::reply(warp::sse::keep_alive().stream(s))) - } else { - return Err(warp_utils::reject::custom_server_error( - "SSE Logging is not enabled".to_string(), - )); - } - }) - - }, - ); + } else { + return Err(warp_utils::reject::custom_server_error( + "SSE Logging is not enabled".to_string(), + )); + } + }) + }); let routes = warp::any() .and(authorization_header_filter)