diff --git a/Cargo.toml b/Cargo.toml index cff806fb6..12450f59e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ members = [ "utils/btdht", "utils/stress-test", "utils/swarm", + "utils/protocol-analyzer", "vfs" ] resolver = "2" diff --git a/utils/protocol-analyzer/Cargo.toml b/utils/protocol-analyzer/Cargo.toml new file mode 100644 index 000000000..66dbc3e26 --- /dev/null +++ b/utils/protocol-analyzer/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "ouisync-protocol-analyzer" +description = "Utility to parse protocol logs and digest them into human readable output" +publish = false +authors.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true + +[[bin]] +name = "protocol-analyzer" +path = "src/main.rs" + +[dependencies] +anyhow = "1.0.75" +clap = { workspace = true } +tokio = { workspace = true, features = ["signal", "io-std", "fs", "macros", "rt-multi-thread", "io-util"] } +tokio-stream = { workspace = true, features = ["sync"] } +tokio-util = { workspace = true } +chrono = "0.4.31" diff --git a/utils/protocol-analyzer/src/main.rs b/utils/protocol-analyzer/src/main.rs new file mode 100644 index 000000000..61459735e --- /dev/null +++ b/utils/protocol-analyzer/src/main.rs @@ -0,0 +1,490 @@ +use anyhow::Result; +use chrono::{NaiveDate, NaiveDateTime}; +use clap::Parser; +use std::{ + collections::{BTreeMap, HashMap}, + path::PathBuf, +}; +use tokio::fs::File; +use tokio::io::{AsyncBufReadExt, BufReader}; + +#[tokio::main] +async fn main() -> Result<()> { + let options = Options::parse(); + + let file = File::open(options.logfile).await?; + let reader = BufReader::new(file); + let mut lines = reader.lines(); + let mut next_line_number = 0; + + let mut context = Context::new(); + + while let Some(input_line) = lines.next_line().await? { + let line_number = next_line_number; + next_line_number += 1; + + let line = parse::line(line_number, &input_line, &mut context); + + let line = match line { + Some(line) => line, + None => continue, + }; + + println!("{line:?}"); + + match line { + Line::ThisRuntimeId(_) => (), + Line::ReceivedRootNode(line) => { + let key = (line.that_label.clone(), line.common_prefix.label.clone()); + let value = context + .from_to_root_nodes + .entry(key) + .or_insert_with(Vec::new); + value.push(line.common_prefix.line_number); + } + Line::ReceivedInnerNode(line) => { + let key = (line.that_label.clone(), line.common_prefix.label.clone()); + let value = context + .from_to_inner_nodes + .entry(key) + .or_insert_with(Vec::new); + value.push(line.common_prefix.line_number); + } + Line::ReceivedLeafNode(line) => { + let key = (line.that_label.clone(), line.common_prefix.label.clone()); + let value = context + .from_to_leaf_nodes + .entry(key) + .or_insert_with(Vec::new); + value.push(line.common_prefix.line_number); + } + Line::ReceivedBlock(line) => { + let key = (line.that_label.clone(), line.common_prefix.label.clone()); + let value = context.from_to_blocks.entry(key).or_insert_with(Vec::new); + value.push(line.common_prefix.line_number); + } + } + } + + println!("RootNodes"); + for ((from, to), lines) in &context.from_to_root_nodes { + println!(" {to:?} <- {from:?}: {} {:?}", lines.len(), lines); + } + println!("InnerNodes"); + for ((from, to), lines) in &context.from_to_inner_nodes { + println!(" {to:?} <- {from:?}: {} {:?}", lines.len(), lines); + } + println!("LeafNodes"); + for ((from, to), lines) in &context.from_to_leaf_nodes { + println!(" {to:?} <- {from:?}: {} {:?}", lines.len(), lines); + } + println!("Blocks"); + for ((from, to), lines) in &context.from_to_blocks { + println!(" {to:?} <- {from:?}: {} {:?}", lines.len(), lines); + } + + Ok(()) +} + +struct Context { + runtime_id_to_label: HashMap, + from_to_root_nodes: BTreeMap<(String, String), Vec>, + from_to_inner_nodes: BTreeMap<(String, String), Vec>, + from_to_leaf_nodes: BTreeMap<(String, String), Vec>, + from_to_blocks: BTreeMap<(String, String), Vec>, +} + +impl Context { + fn new() -> Self { + Self { + runtime_id_to_label: Default::default(), + from_to_root_nodes: Default::default(), + from_to_inner_nodes: Default::default(), + from_to_leaf_nodes: Default::default(), + from_to_blocks: Default::default(), + } + } +} + +#[derive(Debug)] +enum Line { + ThisRuntimeId(ThisRuntimeIdLine), + ReceivedRootNode(ReceivedRootNodeLine), + ReceivedInnerNode(ReceivedInnerNodeLine), + ReceivedLeafNode(ReceivedLeafNodeLine), + ReceivedBlock(ReceivedBlockLine), +} + +#[derive(Debug)] +struct ThisRuntimeIdLine { + common_prefix: CommonPrefix, + this_runtime_id: String, +} + +#[allow(dead_code)] +#[derive(Debug)] +struct ReceivedRootNodeLine { + common_prefix: CommonPrefix, + that_label: String, + hash: String, + exchange_id: u32, +} + +#[allow(dead_code)] +#[derive(Debug)] +struct ReceivedInnerNodeLine { + common_prefix: CommonPrefix, + that_label: String, + exchange_id: u32, +} + +#[allow(dead_code)] +#[derive(Debug)] +struct ReceivedLeafNodeLine { + common_prefix: CommonPrefix, + that_label: String, + exchange_id: u32, +} + +#[allow(dead_code)] +#[derive(Debug)] +struct ReceivedBlockLine { + common_prefix: CommonPrefix, + that_label: String, + block_id: String, +} + +/// Utility to analyze network communication from swarm logs. +/// +#[derive(Debug, Parser)] +struct Options { + /// Logfile to analyze + #[arg(short = 'l', long)] + logfile: PathBuf, +} + +#[allow(dead_code)] +#[derive(Debug)] +struct CommonPrefix { + line_number: u32, + label: String, + date: NaiveDateTime, + log_level: String, +} + +mod parse { + use super::*; + + pub(super) fn line(line_number: u32, input: &str, context: &mut Context) -> Option { + if let Some(line) = this_runtime_id_line(line_number, &mut input.clone()) { + context.runtime_id_to_label.insert( + line.this_runtime_id.clone(), + line.common_prefix.label.clone(), + ); + return Some(Line::ThisRuntimeId(line)); + } + if let Some(line) = received_root_node_line(line_number, &mut input.clone(), context) { + return Some(Line::ReceivedRootNode(line)); + } + if let Some(line) = received_inner_node_line(line_number, &mut input.clone(), context) { + return Some(Line::ReceivedInnerNode(line)); + } + if let Some(line) = received_leaf_node_line(line_number, &mut input.clone(), context) { + return Some(Line::ReceivedLeafNode(line)); + } + if let Some(line) = received_block_line(line_number, &mut input.clone(), context) { + return Some(Line::ReceivedBlock(line)); + } + None + } + + fn this_runtime_id_line(line_number: u32, s: &mut &str) -> Option { + let common_prefix = common_prefix(line_number, s)?; + find_string("this_runtime_id=", s)?; + let this_runtime_id = alphanumeric_string(s)?.into(); + Some(ThisRuntimeIdLine { + common_prefix, + this_runtime_id, + }) + } + + fn received_root_node_line( + line_number: u32, + s: &mut &str, + context: &Context, + ) -> Option { + let common_prefix = common_prefix(line_number, s)?; + find_string("Received root node", s)?; + find_string(" hash=", s)?; + let hash = alphanumeric_string(s)?.into(); + find_string("DebugResponse { exchange_id: ", s)?; + let exchange_id = digits(s)?.parse().ok()?; + find_string("message_broker{", s)?; + let that_runtime_id = alphanumeric_string(s)?; + Some(ReceivedRootNodeLine { + common_prefix, + that_label: context + .runtime_id_to_label + .get(that_runtime_id) + .unwrap() + .clone(), + hash, + exchange_id, + }) + } + + fn received_inner_node_line( + line_number: u32, + s: &mut &str, + context: &Context, + ) -> Option { + let common_prefix = common_prefix(line_number, s)?; + find_string("Received ", s)?; + digits(s)?; + char('/', s)?; + digits(s)?; + string(" inner nodes:", s)?; + find_string("DebugResponse { exchange_id: ", s)?; + let exchange_id = digits(s)?.parse().ok()?; + find_string("message_broker{", s)?; + let that_runtime_id = alphanumeric_string(s)?; + Some(ReceivedInnerNodeLine { + common_prefix, + that_label: context + .runtime_id_to_label + .get(that_runtime_id) + .unwrap() + .clone(), + exchange_id, + }) + } + + fn received_leaf_node_line( + line_number: u32, + s: &mut &str, + context: &Context, + ) -> Option { + let common_prefix = common_prefix(line_number, s)?; + find_string("Received ", s)?; + digits(s)?; + char('/', s)?; + digits(s)?; + string(" leaf nodes:", s)?; + find_string("DebugResponse { exchange_id: ", s)?; + let exchange_id = digits(s)?.parse().ok()?; + find_string("message_broker{", s)?; + let that_runtime_id = alphanumeric_string(s)?; + Some(ReceivedLeafNodeLine { + common_prefix, + that_label: context + .runtime_id_to_label + .get(that_runtime_id) + .unwrap() + .clone(), + exchange_id, + }) + } + + fn received_block_line( + line_number: u32, + s: &mut &str, + context: &Context, + ) -> Option { + let common_prefix = common_prefix(line_number, s)?; + find_string("Received block :: handle_block{id=", s)?; + let block_id = alphanumeric_string(s)?.into(); + find_string("message_broker{", s)?; + let that_runtime_id = alphanumeric_string(s)?; + Some(ReceivedBlockLine { + common_prefix, + that_label: context + .runtime_id_to_label + .get(that_runtime_id) + .unwrap() + .clone(), + block_id, + }) + } + + fn common_prefix(line_number: u32, s: &mut &str) -> Option { + let label = label(s)?; + white_space(s)?; + let date = date(s)?; + white_space(s)?; + let log_level = alphanumeric_string(s)?.into(); + white_space(s)?; + Some(CommonPrefix { + line_number, + label, + date, + log_level, + }) + } + + fn date(s: &mut &str) -> Option { + let year = num_i32(s)?; + char('-', s)?; + let month = num_u32(s)?; + char('-', s)?; + let day = num_u32(s)?; + char('T', s)?; + let hour = num_u32(s)?; + char(':', s)?; + let minute = num_u32(s)?; + char(':', s)?; + let second = num_u32(s)?; + char('.', s)?; + let decimal = digits(s)?; + char('Z', s)?; + + let mut micro = 0; + let mut coef = 100000; + + for d in decimal.chars().take(6) { + micro += d.to_digit(10 /* RADIX */)? * coef; + coef /= 10; + } + + NaiveDate::from_ymd_opt(year, month, day) + .unwrap() + .and_hms_micro_opt(hour, minute, second, micro) + } + + pub(super) fn label(s: &mut &str) -> Option { + char('[', s)?; + let l = alphanumeric_string(s)?; + white_space(s).unwrap_or(()); + char(']', s)?; + Some(l.into()) + } + + pub(super) fn num_u32(s: &mut &str) -> Option { + digits(s)?.parse().ok() + } + + pub(super) fn num_i32(s: &mut &str) -> Option { + digits(s)?.parse().ok() + } + + pub(super) fn digits<'a>(s: &mut &'a str) -> Option<&'a str> { + take_while( + |s| { + s.chars() + .next() + .map(|c| if c.is_ascii_digit() { 1 } else { 0 }) + .unwrap_or(0) + }, + s, + ) + } + + pub(super) fn is_alphanumeric(c: char) -> bool { + c.is_ascii_digit() || c.is_ascii_uppercase() || c.is_ascii_lowercase() + } + + pub(super) fn alphanumeric_char(s: &mut &str) -> Option { + let c = match s.chars().next() { + Some(c) => c, + None => return None, + }; + if is_alphanumeric(c) { + *s = &s[c.len_utf8()..]; + return Some(c); + } + None + } + + pub(super) fn alphanumeric_string<'a>(s: &mut &'a str) -> Option<&'a str> { + take_one_or_more(|s| alphanumeric_char(s).map(|c| c.len_utf8()), s) + } + + pub(super) fn string(prefix: &str, s: &mut &str) -> Option<()> { + if s.starts_with(prefix) { + *s = &s[prefix.len()..]; + Some(()) + } else { + None + } + } + + pub(super) fn char(c: char, s: &mut &str) -> Option { + if s.starts_with(c) { + let l = c.len_utf8(); + *s = &s[l..]; + Some(l) + } else { + None + } + } + + fn find_string(prefix: &str, s: &mut &str) -> Option<()> { + loop { + if string(prefix, s).is_some() { + return Some(()); + } + + any_char(s)?; + } + } + + pub(super) fn any_char(s: &mut &str) -> Option { + let c = s.chars().next(); + if let Some(c) = c { + let l = c.len_utf8(); + *s = &s[l..]; + Some(l) + } else { + None + } + } + + pub(super) fn count(f: F, s: &mut &str) -> Option + where + F: Fn(&mut &str) -> Option, + { + let mut count = None; + while let Some(n) = f(s) { + match &mut count { + Some(c) => *c += n, + None => count = Some(n), + } + } + count + } + + pub(super) fn take_one_or_more<'a, F>(f: F, s: &mut &'a str) -> Option<&'a str> + where + F: Fn(&mut &str) -> Option, + { + let mut s2 = *s; + let c = match count(f, &mut s2) { + Some(c) => c, + None => return None, + }; + let (parsed, rest) = s.split_at(c); + *s = rest; + Some(parsed) + } + + pub(super) fn take_while<'a, F>(f: F, s: &mut &'a str) -> Option<&'a str> + where + F: Fn(&str) -> usize, + { + let rest = &mut (*s).clone(); + loop { + let n = f(rest); + if n == 0 { + break; + } + *rest = &rest[n..]; + } + + let ret = &s[0..(s.len() - rest.len())]; + *s = rest; + Some(ret) + } + + pub(super) fn white_space(s: &mut &str) -> Option<()> { + count(|s| char(' ', s), s).map(|_| ()) + } +} diff --git a/utils/swarm/src/main.rs b/utils/swarm/src/main.rs index 26f48cc82..58106a550 100644 --- a/utils/swarm/src/main.rs +++ b/utils/swarm/src/main.rs @@ -91,6 +91,9 @@ fn build() -> Result<()> { .arg("--package") .arg("ouisync-cli") .arg("--release") + // Adds debugging payload to the messages. + .arg("--features") + .arg("ouisync-lib/analyze-protocol") .status()?; if !status.success() {