diff --git a/Cargo.lock b/Cargo.lock index d7bdc7e..e8893e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,16 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "Inflector" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3" +dependencies = [ + "lazy_static", + "regex", +] + [[package]] name = "addr2line" version = "0.21.0" @@ -457,11 +467,9 @@ dependencies = [ "chrono", "clap", "clickhouse", - "env_logger", "futures 0.3.28", "hyper", "hyper-rustls", - "log", "primitive-types", "prost", "prost-types", @@ -475,22 +483,14 @@ dependencies = [ "tokio-retry", "tokio-stream", "tonic", + "tracing", + "tracing-core", + "tracing-stackdriver", + "tracing-subscriber", + "tracing-test", "url", ] -[[package]] -name = "env_logger" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85cdab6a89accf66733ad5a1693a4dcced6aeff64602b634530dd73c1f3ee9f0" -dependencies = [ - "humantime", - "is-terminal", - "log", - "regex", - "termcolor", -] - [[package]] name = "equivalent" version = "1.0.1" @@ -789,12 +789,6 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" -[[package]] -name = "humantime" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" - [[package]] name = "hyper" version = "0.14.27" @@ -933,17 +927,6 @@ dependencies = [ "hashbrown 0.14.0", ] -[[package]] -name = "is-terminal" -version = "0.4.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" -dependencies = [ - "hermit-abi", - "rustix", - "windows-sys", -] - [[package]] name = "itertools" version = "0.10.5" @@ -1022,6 +1005,15 @@ dependencies = [ "libc", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "matchit" version = "0.7.2" @@ -1084,6 +1076,16 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num-bigint" version = "0.4.4" @@ -1189,6 +1191,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "pad" version = "0.1.6" @@ -1483,8 +1491,17 @@ checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.3.8", + "regex-syntax 0.7.5", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -1495,9 +1512,15 @@ checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.7.5", ] +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.7.5" @@ -1593,6 +1616,12 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" +[[package]] +name = "ryu" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" + [[package]] name = "schannel" version = "0.1.22" @@ -1684,6 +1713,26 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "serde_json" +version = "1.0.107" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65" +dependencies = [ + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "sharded-slab" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31" +dependencies = [ + "lazy_static", +] + [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -1855,15 +1904,6 @@ dependencies = [ "windows-sys", ] -[[package]] -name = "termcolor" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6" -dependencies = [ - "winapi-util", -] - [[package]] name = "thiserror" version = "1.0.48" @@ -1884,6 +1924,16 @@ dependencies = [ "syn 2.0.31", ] +[[package]] +name = "thread_local" +version = "1.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152" +dependencies = [ + "cfg-if", + "once_cell", +] + [[package]] name = "time" version = "0.1.45" @@ -1902,8 +1952,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "17f6bb557fd245c28e6411aa56b6403c689ad95061f50e4be16c274e70a17e48" dependencies = [ "deranged", + "itoa", "serde", "time-core", + "time-macros", ] [[package]] @@ -1912,6 +1964,15 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" +[[package]] +name = "time-macros" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a942f44339478ef67935ab2bbaec2fb0322496cf3cbe84b261e06ac3814c572" +dependencies = [ + "time-core", +] + [[package]] name = "tinyvec" version = "1.6.0" @@ -2135,6 +2196,87 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" +dependencies = [ + "lazy_static", + "log", + "tracing-core", +] + +[[package]] +name = "tracing-serde" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1" +dependencies = [ + "serde", + "tracing-core", +] + +[[package]] +name = "tracing-stackdriver" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac98e757a488cc11d6a84a52d223b970e11c99e5158cc2ff07b8544ccc260078" +dependencies = [ + "Inflector", + "serde", + "serde_json", + "thiserror", + "time 0.3.28", + "tracing-core", + "tracing-subscriber", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex", + "serde", + "serde_json", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", + "tracing-serde", +] + +[[package]] +name = "tracing-test" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a2c0ff408fe918a94c428a3f2ad04e4afd5c95bbc08fcf868eff750c15728a4" +dependencies = [ + "lazy_static", + "tracing-core", + "tracing-subscriber", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "258bc1c4f8e2e73a977812ab339d503e6feeb92700f6d07a6de4d321522d5c08" +dependencies = [ + "lazy_static", + "quote", + "syn 1.0.109", ] [[package]] @@ -2211,6 +2353,12 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "vcpkg" version = "0.2.15" @@ -2330,15 +2478,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" -[[package]] -name = "winapi-util" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" -dependencies = [ - "winapi", -] - [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index 9e0cbf4..247de72 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,11 +29,14 @@ strum_macros = "0.25" clap = { version = "4.3.21", features = ["derive"] } url = "2.4.0" hyper-rustls = "0.24.1" -env_logger = "0.10.0" -log = "0.4.20" +tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } +tracing = "0.1.37" +tracing-stackdriver = "0.7.2" +tracing-core = "0.1.31" [dev-dependencies] clickhouse = { version = "0.11.5", features = ["test-util"] } +tracing-test = "0.2.4" [patch.crates-io] diff --git a/src/loader.rs b/src/loader.rs index 7bdfce7..4135940 100644 --- a/src/loader.rs +++ b/src/loader.rs @@ -7,7 +7,7 @@ use clickhouse::{ inserter::{Inserter, RowInserter, SchemaInserter}, Client, Row, }; -use log::{debug, info, warn}; +use tracing::{debug, info, warn}; use prost::Message; use serde::{Deserialize, Serialize}; use substreams_database_change::pb::database::{ @@ -127,6 +127,7 @@ impl DatabaseLoader { async fn process_final_blocks(&mut self, data: BlockScopedData) -> Result<(), ElricError> { let output = data.output.as_ref().unwrap().map_output.as_ref().unwrap(); let database_changes = DatabaseChanges::decode(output.value.as_slice())?; + let changes_length = database_changes.table_changes.len(); let splitted_inserts = split_table_changes(database_changes.table_changes); @@ -162,6 +163,8 @@ impl DatabaseLoader { let block_num = data.clock.as_ref().unwrap().number; info!( + block_num, + changes_length, "Block #{} - Payload {} ({} bytes)", block_num, output.type_url.replace("type.googleapis.com/", ""), @@ -172,7 +175,7 @@ impl DatabaseLoader { } pub fn process_block_undo_signal(&mut self, block_num_signal: u64) { - warn!("Processing undo signal for block {}", block_num_signal); + warn!(undo_block_num = block_num_signal, "Processing undo signal for block {}", block_num_signal); let final_block_index = self .buffer .iter() @@ -181,8 +184,11 @@ impl DatabaseLoader { .map(|i| self.buffer.len() - i); if let Some(index) = final_block_index { - debug!("final_block_index drain: {:?}", index..); - self.buffer.drain(index..); + let drained = self.buffer.drain(index..); + for d in drained { + let block_num = d.clock.as_ref().unwrap().number; + debug!(block_num, ?d, "New block drained"); + } } } @@ -249,6 +255,7 @@ mod tests { use prost_types::Any; use serde::Deserialize; use substreams_database_change::pb::database::{DatabaseChanges, Field, TableChange}; + use tracing_test::traced_test; use crate::{ loader::BUFFER_LEN, @@ -262,10 +269,6 @@ mod tests { use super::DatabaseLoader; use anyhow::Result; - fn init() { - let _ = env_logger::builder().is_test(true).try_init(); - } - #[tokio::test] async fn test_undo_block_signal() { let mut buffer = VecDeque::new(); @@ -353,8 +356,8 @@ mod tests { } #[tokio::test] + #[traced_test] async fn test_process_data() -> Result<()> { - init(); let mut mock = test::Mock::new(); mock.non_exhaustive(); let client = Client::default().with_url(mock.url()); diff --git a/src/logging.rs b/src/logging.rs new file mode 100644 index 0000000..a795cc2 --- /dev/null +++ b/src/logging.rs @@ -0,0 +1,43 @@ +use tracing_subscriber::{registry::LookupSpan, Layer}; + +pub struct LogConfig { + is_prod: bool, +} + +impl LogConfig { + /// Create a new `LogConfig` instance. + /// This config will use the Stackdriver logging layer if the + /// `K_SERVICE` or `KUBERNETES_SERVICE_HOST` environment variables are set. + /// Otherwise, it will use the stdout layer. + /// + /// How to use: + /// ``` + /// use logging::LogConfig; + /// use tracing_subscriber::{prelude::*, Registry}; + /// + /// let subscriber = Registry::default(); + /// let subscriber = subscriber.with(cfg.layer()); + /// tracing::subscriber::set_global_default(subscriber).unwrap(); + /// ``` + pub fn new() -> Self { + let k_service = std::env::var("K_SERVICE"); + let kubernetes_service = std::env::var("KUBERNETES_SERVICE_HOST"); + Self { + is_prod: k_service.is_ok() || kubernetes_service.is_ok(), + } + } + + pub fn layer(&self) -> Box + Send + Sync + 'static> + where + S: tracing_core::Subscriber, + for<'a> S: LookupSpan<'a>, + { + if self.is_prod { + let stackdriver = tracing_stackdriver::layer(); + Box::new(stackdriver) + } else { + let stdout_log = tracing_subscriber::fmt::layer(); + Box::new(stdout_log) + } + } +} diff --git a/src/main.rs b/src/main.rs index 590db24..f3fb4c0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,8 +5,11 @@ use futures03::future::join_all; use futures03::StreamExt; use hyper_rustls::HttpsConnectorBuilder; use loader::Cursor; -use log::{error, info}; +use tracing::{error, info}; +use logging::LogConfig; use pb::sf::substreams::v1::Package; +use tracing_core::LevelFilter; +use tracing_subscriber::{prelude::*, Registry, EnvFilter}; use url::Url; use prost::Message; @@ -26,6 +29,7 @@ use crate::table_info::{get_columns, get_table_information, DynamicTable}; mod fixed_string; mod loader; +mod logging; mod pb; mod substreams; mod substreams_stream; @@ -90,7 +94,14 @@ pub enum ElricError { #[tokio::main] async fn main() -> Result<(), Error> { - env_logger::init(); + let subscriber = Registry::default(); + let cfg = LogConfig::new(); + let subscriber = subscriber.with(cfg.layer()).with( + EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .from_env_lossy(), + ); + tracing::subscriber::set_global_default(subscriber).expect("Could not set up global logger"); let cli = Cli::parse(); @@ -216,8 +227,7 @@ async fn run( loader.process_block_undo_signal(block_num_signal); } Some(Err(err)) => { - error!("Stream terminated with error"); - error!("{:?}", err); + error!(%err, "Stream terminated with error"); exit(1); } }, diff --git a/src/substreams_stream.rs b/src/substreams_stream.rs index dc14ac6..bce348c 100644 --- a/src/substreams_stream.rs +++ b/src/substreams_stream.rs @@ -1,7 +1,7 @@ use anyhow::{anyhow, Error}; use async_stream::try_stream; use futures03::{Stream, StreamExt}; -use log::{error, info, warn}; +use tracing::{error, info, warn}; use std::{ pin::Pin, sync::Arc,