Skip to content

Commit

Permalink
Tokio broadcast version
Browse files Browse the repository at this point in the history
  • Loading branch information
AgeManning committed Mar 9, 2023
1 parent e34aab2 commit 4da3351
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 44 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 23 additions & 8 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3569,7 +3569,7 @@ pub fn serve<T: BeaconChainTypes>(

// Subscribe to logs via Server Side Events
// /lighthouse/logs
let get_events = warp::path("lighthouse")
let lighthouse_log_events = warp::path("lighthouse")
.and(warp::path("logs"))
.and(warp::path::end())
.and(sse_component_filter)
Expand All @@ -3579,13 +3579,27 @@ pub fn serve<T: BeaconChainTypes>(

if let Some(logging_components) = sse_component {
// Build a JSON stream
let
let s = BroadcastStream::new(sse_component.sender.subscribe()).map(|msg| {
// Serialize to json



;
let s = BroadcastStream::new(logging_components.sender.subscribe()).map(|msg| {
match msg {
Ok(data) => {
// Serialize to json
match data.to_json_string() {
// Send the json as a Server Sent Event
Ok(json) => Event::default()
.json_data(json)
.map_err(|e| {
warp_utils::reject::server_sent_event_error(format!("{:?}", e))
}),
Err(e) => Err(warp_utils::reject::server_sent_event_error(
format!("Unable to serialize to JSON {}",e),
))
}
}
Err(e) => Err(warp_utils::reject::server_sent_event_error(
format!("Unable to serialize to JSON {}",e),
))
}
});

Ok::<_, warp::Rejection>(warp::sse::reply(warp::sse::keep_alive().stream(s)))
} else {
Expand Down Expand Up @@ -3665,6 +3679,7 @@ pub fn serve<T: BeaconChainTypes>(
.or(get_lighthouse_block_packing_efficiency.boxed())
.or(get_lighthouse_merge_readiness.boxed())
.or(get_events.boxed())
.or(lighthouse_log_events.boxed())
.recover(warp_utils::reject::handle_rejection),
)
.boxed()
Expand Down
1 change: 1 addition & 0 deletions common/logging/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ slog-async = "2.7.0"
take_mut = "0.2.2"
parking_lot = "0.12.1"
serde = "1.0.153"
serde_json = "1.0.94"
35 changes: 3 additions & 32 deletions common/logging/src/async_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@ use parking_lot::Mutex;
use serde::ser::SerializeMap;
use serde::serde_if_integer128;
use serde::Serialize;
use slog::{
BorrowedKV, Drain, Key, Level, OwnedKVList, Record, RecordStatic, Serializer, SingleKV, KV,
};
use slog::{BorrowedKV, Key, Level, OwnedKVList, Record, RecordStatic, Serializer, SingleKV, KV};
use std::cell::RefCell;
use std::fmt;
use std::fmt::Write;
use std::io;
use std::sync::Arc;
use take_mut::take;

Expand Down Expand Up @@ -49,34 +46,8 @@ impl AsyncRecord {
}
}

/// Writes the record to a `Drain`.
pub fn log_to<D: Drain>(self, drain: &D) -> Result<D::Ok, D::Err> {
let rs = RecordStatic {
location: &*self.location,
level: self.level,
tag: &self.tag,
};

let kv = self.kv.lock();
drain.log(
&Record::new(&rs, &format_args!("{}", self.msg), BorrowedKV(&(*kv))),
&self.logger_values,
)
}

/// Deconstruct this `AsyncRecord` into a record and `OwnedKVList`.
pub fn as_record_values(&self, mut f: impl FnMut(&Record, &OwnedKVList)) {
let rs = RecordStatic {
location: &*self.location,
level: self.level,
tag: &self.tag,
};

let kv = self.kv.lock();
f(
&Record::new(&rs, &format_args!("{}", self.msg), BorrowedKV(&(*kv))),
&self.logger_values,
)
pub fn to_json_string(&self) -> Result<String, String> {
serde_json::to_string(&self).map_err(|e| format!("{:?}", e))
}
}

Expand Down
4 changes: 2 additions & 2 deletions common/logging/src/sse_logging_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
//! there are subscribers to a HTTP SSE stream.

use crate::async_record::AsyncRecord;
use slog::{Drain, Level, OwnedKVList, Record, KV};
use slog::{Drain, OwnedKVList, Record};
use std::panic::AssertUnwindSafe;
use std::sync::Arc;
use tokio::sync::broadcast::{error::SendError, Receiver, Sender};
use tokio::sync::broadcast::Sender;

/// The components required in the HTTP API task to receive logged events.
#[derive(Clone)]
Expand Down

0 comments on commit 4da3351

Please sign in to comment.