diff --git a/Cargo.lock b/Cargo.lock index 70190e998f0..687c774bedf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4498,14 +4498,16 @@ dependencies = [ name = "logging" version = "0.2.0" dependencies = [ - "crossbeam-channel", "lazy_static", "lighthouse_metrics", + "parking_lot 0.12.1", + "serde", "slog", "slog-async", "slog-term", "sloggers", "take_mut", + "tokio", ] [[package]] @@ -6781,9 +6783,9 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.152" +version = "1.0.154" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb7d1f0d3021d347a83e556fc4683dea2ea09d87bccdf88ff5c12545d89d5efb" +checksum = "8cdd151213925e7f1ab45a9bbfb129316bd00799784b174b7cc7bcd16961c49e" dependencies = [ "serde_derive", ] @@ -6820,9 +6822,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.152" +version = "1.0.154" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e" +checksum = "4fc80d722935453bcafdc2c9a73cd6fac4dc1938f0346035d84bf99fa9e33217" dependencies = [ "proc-macro2", "quote", diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index fe0f21145bb..26e1f9a9051 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -3572,74 +3572,33 @@ pub fn serve( let get_events = warp::path("lighthouse") .and(warp::path("logs")) .and(warp::path::end()) + .and(sse_component_filter) .and_then( - |topics_res: Result, - chain: Arc>| { + |sse_component: Option| { blocking_task(move || { - let topics = topics_res?; - // for each topic subscribed spawn a new subscription - let mut receivers = Vec::with_capacity(topics.topics.len()); - if let Some(event_handler) = chain.event_handler.as_ref() { - for topic in topics.topics { - let receiver = match topic { - api_types::EventTopic::Head => event_handler.subscribe_head(), - api_types::EventTopic::Block => event_handler.subscribe_block(), - api_types::EventTopic::Attestation => { - event_handler.subscribe_attestation() - } - api_types::EventTopic::VoluntaryExit => { - event_handler.subscribe_exit() - } - api_types::EventTopic::FinalizedCheckpoint => { - event_handler.subscribe_finalized() - } - api_types::EventTopic::ChainReorg => { - event_handler.subscribe_reorgs() - } - api_types::EventTopic::ContributionAndProof => { - event_handler.subscribe_contributions() - } - api_types::EventTopic::LateHead => { - event_handler.subscribe_late_head() - } - api_types::EventTopic::BlockReward => { - event_handler.subscribe_block_reward() - } - }; + if let Some(logging_components) = sse_component { + // Build a JSON stream + let + let s = BroadcastStream::new(sse_component.sender.subscribe()).map(|msg| { + // Serialize to json - receivers.push(BroadcastStream::new(receiver).map(|msg| { - match msg { - Ok(data) => Event::default() - .event(data.topic_name()) - .json_data(data) - .map_err(|e| { - warp_utils::reject::server_sent_event_error(format!( - "{:?}", - e - )) - }), - Err(e) => Err(warp_utils::reject::server_sent_event_error( - format!("{:?}", e), - )), - } - })); - } + + + ; + + Ok::<_, warp::Rejection>(warp::sse::reply(warp::sse::keep_alive().stream(s))) } else { return Err(warp_utils::reject::custom_server_error( - "event handler was not initialized".to_string(), + "SSE Logging is not enabled".to_string(), )); } - - let s = futures::stream::select_all(receivers); - - Ok::<_, warp::Rejection>(warp::sse::reply(warp::sse::keep_alive().stream(s))) }) + }, ); - // Define the ultimate set of routes that will be provided to the server. let routes = warp::get() .and( diff --git a/common/logging/Cargo.toml b/common/logging/Cargo.toml index 3d009251e42..fb8e8bf875a 100644 --- a/common/logging/Cargo.toml +++ b/common/logging/Cargo.toml @@ -10,9 +10,11 @@ test_logger = [] # Print log output to stderr when running tests instead of drop [dependencies] slog = "2.5.2" slog-term = "2.6.0" +tokio = "1.14.0" lighthouse_metrics = { path = "../lighthouse_metrics" } lazy_static = "1.4.0" sloggers = { version = "2.1.1", features = ["json"] } slog-async = "2.7.0" take_mut = "0.2.2" -crossbeam-channel = "0.5.7" +parking_lot = "0.12.1" +serde = "1.0.153" diff --git a/common/logging/src/async_record.rs b/common/logging/src/async_record.rs index 905294c83f9..914d603cbba 100644 --- a/common/logging/src/async_record.rs +++ b/common/logging/src/async_record.rs @@ -1,13 +1,24 @@ //! An object that can be used to pass through a channel and be cloned. It can therefore be used //! via the broadcast channel. +use parking_lot::Mutex; +use serde::ser::SerializeMap; +use serde::serde_if_integer128; +use serde::Serialize; use slog::{ BorrowedKV, Drain, Key, Level, OwnedKVList, Record, RecordStatic, Serializer, SingleKV, KV, }; +use std::cell::RefCell; use std::fmt; +use std::fmt::Write; +use std::io; use std::sync::Arc; use take_mut::take; +thread_local! { + static TL_BUF: RefCell = RefCell::new(String::with_capacity(128)) +} + /// Serialized record. #[derive(Clone)] pub struct AsyncRecord { @@ -16,7 +27,7 @@ pub struct AsyncRecord { location: Box, tag: String, logger_values: OwnedKVList, - kv: Arc, + kv: Arc>, } impl AsyncRecord { @@ -34,7 +45,7 @@ impl AsyncRecord { location: Box::new(*record.location()), tag: String::from(record.tag()), logger_values: logger_values.clone(), - kv: Arc::new(ser.finish()), + kv: Arc::new(Mutex::new(ser.finish())), } } @@ -46,8 +57,9 @@ impl AsyncRecord { tag: &self.tag, }; + let kv = self.kv.lock(); drain.log( - &Record::new(&rs, &format_args!("{}", self.msg), BorrowedKV(&self.kv)), + &Record::new(&rs, &format_args!("{}", self.msg), BorrowedKV(&(*kv))), &self.logger_values, ) } @@ -60,8 +72,9 @@ impl AsyncRecord { tag: &self.tag, }; + let kv = self.kv.lock(); f( - &Record::new(&rs, &format_args!("{}", self.msg), BorrowedKV(&self.kv)), + &Record::new(&rs, &format_args!("{}", self.msg), BorrowedKV(&(*kv))), &self.logger_values, ) } @@ -167,11 +180,137 @@ impl Serializer for ToSendSerializer { take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val)))); Ok(()) } +} - #[cfg(feature = "nested-values")] - fn emit_serde(&mut self, key: Key, value: &slog::SerdeValue) -> slog::Result { - let val = value.to_sendable(); - take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val)))); +impl Serialize for AsyncRecord { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let rs = RecordStatic { + location: &*self.location, + level: self.level, + tag: &self.tag, + }; + let mut map_serializer = SerdeSerializer::new(serializer)?; + let kv = self.kv.lock(); + let message = format_args!("{}", self.msg); + let record = Record::new(&rs, &message, BorrowedKV(&(*kv))); + + self.logger_values + .serialize(&record, &mut map_serializer) + .map_err(|e| serde::ser::Error::custom(e))?; + record + .kv() + .serialize(&record, &mut map_serializer) + .map_err(serde::ser::Error::custom)?; + map_serializer.end() + } +} + +struct SerdeSerializer { + /// Current state of map serializing: `serde::Serializer::MapState` + ser_map: S::SerializeMap, +} + +impl SerdeSerializer { + fn new(ser: S) -> Result { + let ser_map = ser.serialize_map(None)?; + Ok(SerdeSerializer { ser_map }) + } + + /// Finish serialization, and return the serializer + fn end(self) -> Result { + self.ser_map.end() + } +} + +// NOTE: This is borrowed from slog_json +macro_rules! impl_m( + ($s:expr, $key:expr, $val:expr) => ({ + let k_s: &str = $key.as_ref(); + $s.ser_map.serialize_entry(k_s, $val) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("serde serialization error: {}", e)))?; Ok(()) + }); +); + +impl slog::Serializer for SerdeSerializer +where + S: serde::Serializer, +{ + fn emit_bool(&mut self, key: Key, val: bool) -> slog::Result { + impl_m!(self, key, &val) + } + + fn emit_unit(&mut self, key: Key) -> slog::Result { + impl_m!(self, key, &()) + } + + fn emit_char(&mut self, key: Key, val: char) -> slog::Result { + impl_m!(self, key, &val) + } + + fn emit_none(&mut self, key: Key) -> slog::Result { + let val: Option<()> = None; + impl_m!(self, key, &val) + } + fn emit_u8(&mut self, key: Key, val: u8) -> slog::Result { + impl_m!(self, key, &val) + } + fn emit_i8(&mut self, key: Key, val: i8) -> slog::Result { + impl_m!(self, key, &val) + } + fn emit_u16(&mut self, key: Key, val: u16) -> slog::Result { + impl_m!(self, key, &val) + } + fn emit_i16(&mut self, key: Key, val: i16) -> slog::Result { + impl_m!(self, key, &val) + } + fn emit_usize(&mut self, key: Key, val: usize) -> slog::Result { + impl_m!(self, key, &val) + } + fn emit_isize(&mut self, key: Key, val: isize) -> slog::Result { + impl_m!(self, key, &val) + } + fn emit_u32(&mut self, key: Key, val: u32) -> slog::Result { + impl_m!(self, key, &val) + } + fn emit_i32(&mut self, key: Key, val: i32) -> slog::Result { + impl_m!(self, key, &val) + } + fn emit_f32(&mut self, key: Key, val: f32) -> slog::Result { + impl_m!(self, key, &val) + } + fn emit_u64(&mut self, key: Key, val: u64) -> slog::Result { + impl_m!(self, key, &val) + } + fn emit_i64(&mut self, key: Key, val: i64) -> slog::Result { + impl_m!(self, key, &val) + } + fn emit_f64(&mut self, key: Key, val: f64) -> slog::Result { + impl_m!(self, key, &val) + } + serde_if_integer128! { + fn emit_u128(&mut self, key: Key, val: u128) -> slog::Result { + impl_m!(self, key, &val) + } + fn emit_i128(&mut self, key: Key, val: i128) -> slog::Result { + impl_m!(self, key, &val) + } + } + fn emit_str(&mut self, key: Key, val: &str) -> slog::Result { + impl_m!(self, key, &val) + } + fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> slog::Result { + TL_BUF.with(|buf| { + let mut buf = buf.borrow_mut(); + + buf.write_fmt(*val).unwrap(); + + let res = { || impl_m!(self, key, &*buf) }(); + buf.clear(); + res + }) } } diff --git a/common/logging/src/lib.rs b/common/logging/src/lib.rs index cec230dc3e9..a9ad25f3f3e 100644 --- a/common/logging/src/lib.rs +++ b/common/logging/src/lib.rs @@ -12,8 +12,8 @@ use std::time::{Duration, Instant}; pub const MAX_MESSAGE_WIDTH: usize = 40; pub mod async_record; -mod sse_drain; -pub use sse_drain::{SSEDrain, SSELoggingComponents}; +mod sse_logging_components; +pub use sse_logging_components::SSELoggingComponents; /// The minimum interval between log messages indicating that a queue is full. const LOG_DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30); diff --git a/common/logging/src/sse_drain.rs b/common/logging/src/sse_drain.rs deleted file mode 100644 index aeee86169fc..00000000000 --- a/common/logging/src/sse_drain.rs +++ /dev/null @@ -1,69 +0,0 @@ -//! This module provides an implementation of `slog::Drain` that optionally writes to a channel if -//! there are subscribers to a HTTP SSE stream. - -use crossbeam_channel::{Receiver, Sender, TrySendError}; -use slog::{Drain, OwnedKVList, Record}; -use slog_async::AsyncRecord; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; - -/// The components required in the HTTP API task to receive logged events. -pub struct SSELoggingComponents { - /// The channel to receive events from. - pub receiver: Receiver, - /// Indicates if there are currently subscribers to the http API. - pub subscribers: Arc, -} - -/// An slog drain used to pass logs to the SSE event stream in the HTTP API. -pub struct SSEDrain { - /// The channel to send events to. - sender: Sender, - /// Indicates if there are currently subscribers to the http API. - pub subscribers: Arc, -} - -impl SSEDrain { - /// Create a new SSE drain. - pub fn new(channel_size: usize) -> (Self, SSELoggingComponents) { - let (sender, receiver) = crossbeam_channel::bounded::(channel_size); - let subscribers = Arc::new(AtomicBool::new(false)); - - let drain = SSEDrain { - sender, - subscribers: subscribers.clone(), - }; - ( - drain, - SSELoggingComponents { - receiver, - subscribers, - }, - ) - } -} - -impl Drain for SSEDrain { - type Ok = (); - type Err = &'static str; - - fn log(&self, record: &Record, logger_values: &OwnedKVList) -> Result { - if !self.subscribers.load(Ordering::Relaxed) { - return Ok(()); // Drop the logs, there are no subscribers - } - - // There are subscribers, attempt to send the logs - match self - .sender - .try_send(AsyncRecord::from(record, logger_values)) - { - Ok(()) => {} // Everything got sent - Err(TrySendError::Full(_failed_log)) => {} // Ignore dropped logs - - Err(TrySendError::Disconnected(_failed_log)) => { - return Err("Channel Disconnected"); - } - } - Ok(()) - } -} diff --git a/common/logging/src/sse_logging_components.rs b/common/logging/src/sse_logging_components.rs new file mode 100644 index 00000000000..e57d2aa2cd4 --- /dev/null +++ b/common/logging/src/sse_logging_components.rs @@ -0,0 +1,39 @@ +//! This module provides an implementation of `slog::Drain` that optionally writes to a channel if +//! there are subscribers to a HTTP SSE stream. + +use crate::async_record::AsyncRecord; +use slog::{Drain, Level, OwnedKVList, Record, KV}; +use std::panic::AssertUnwindSafe; +use std::sync::Arc; +use tokio::sync::broadcast::{error::SendError, Receiver, Sender}; + +/// The components required in the HTTP API task to receive logged events. +#[derive(Clone)] +pub struct SSELoggingComponents { + /// The channel to receive events from. + pub sender: Arc>>, +} + +impl SSELoggingComponents { + /// Create a new SSE drain. + pub fn new(channel_size: usize) -> Self { + let (sender, _receiver) = tokio::sync::broadcast::channel(channel_size); + + let sender = Arc::new(AssertUnwindSafe(sender)); + SSELoggingComponents { sender } + } +} + +impl Drain for SSELoggingComponents { + type Ok = (); + type Err = &'static str; + + fn log(&self, record: &Record, logger_values: &OwnedKVList) -> Result { + // There are subscribers, attempt to send the logs + match self.sender.send(AsyncRecord::from(record, logger_values)) { + Ok(_num_sent) => {} // Everything got sent + Err(_err) => {} // There are no subscribers, do nothing + } + Ok(()) + } +} diff --git a/lighthouse/environment/src/lib.rs b/lighthouse/environment/src/lib.rs index 7d6e82e236d..500d2b32be2 100644 --- a/lighthouse/environment/src/lib.rs +++ b/lighthouse/environment/src/lib.rs @@ -19,7 +19,7 @@ use std::fs::create_dir_all; use std::io::{Result as IOResult, Write}; use std::path::PathBuf; use std::sync::Arc; -use logging::{SSELoggingComponents, SSEDrain}; +use logging::SSELoggingComponents; use task_executor::{ShutdownReason, TaskExecutor}; use tokio::runtime::{Builder as RuntimeBuilder, Runtime}; use types::{EthSpec, GnosisEthSpec, MainnetEthSpec, MinimalEthSpec}; @@ -320,8 +320,8 @@ impl EnvironmentBuilder { // If the http API is enabled, we may need to send logs to be consumed by subscribers. if config.sse_logging { - let (sse_logger, components) = SSEDrain::new(SSE_LOG_CHANNEL_SIZE); - self.sse_logging_components = Some(components); + let sse_logger = SSELoggingComponents::new(SSE_LOG_CHANNEL_SIZE); + self.sse_logging_components = Some(sse_logger.clone()); log = Logger::root(Duplicate::new(log, sse_logger).fuse(), o!()); }