diff --git a/Cargo.lock b/Cargo.lock index 354c0a50f..991c3919b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5604,8 +5604,6 @@ dependencies = [ "async-trait", "chrono", "clap", - "opentelemetry", - "opentelemetry-otlp", "opentelemetry-proto", "portpicker", "pretty_assertions", @@ -5620,7 +5618,6 @@ dependencies = [ "tokio-stream", "tonic", "tracing", - "tracing-opentelemetry", "tracing-subscriber", ] @@ -5703,9 +5700,6 @@ dependencies = [ "crossbeam-channel", "futures", "hyper", - "opentelemetry", - "opentelemetry-otlp", - "opentelemetry-proto", "portpicker", "prost-types", "rmp-serde", @@ -5722,8 +5716,6 @@ dependencies = [ "tonic", "tower", "tracing", - "tracing-opentelemetry", - "tracing-subscriber", "uuid", "wasi-common", "wasmtime", diff --git a/codegen/src/lib.rs b/codegen/src/lib.rs index eddf72e07..ee4c37c5c 100644 --- a/codegen/src/lib.rs +++ b/codegen/src/lib.rs @@ -3,20 +3,20 @@ mod next; #[cfg(feature = "frameworks")] mod shuttle_main; -use proc_macro::TokenStream; -use proc_macro_error::proc_macro_error; - #[cfg(feature = "frameworks")] -#[proc_macro_error] +#[proc_macro_error::proc_macro_error] #[proc_macro_attribute] -pub fn main(attr: TokenStream, item: TokenStream) -> TokenStream { +pub fn main( + attr: proc_macro::TokenStream, + item: proc_macro::TokenStream, +) -> proc_macro::TokenStream { shuttle_main::r#impl(attr, item) } #[cfg(feature = "next")] -#[proc_macro_error] +#[proc_macro_error::proc_macro_error] #[proc_macro] -pub fn app(item: TokenStream) -> TokenStream { +pub fn app(item: proc_macro::TokenStream) -> proc_macro::TokenStream { use next::App; use syn::{parse_macro_input, File}; diff --git a/codegen/src/shuttle_main/mod.rs b/codegen/src/shuttle_main/mod.rs index 870827b36..a2f9e278d 100644 --- a/codegen/src/shuttle_main/mod.rs +++ b/codegen/src/shuttle_main/mod.rs @@ -1,5 +1,4 @@ use proc_macro::TokenStream; -use proc_macro2::Punct; use proc_macro2::Span; use proc_macro_error::emit_error; use quote::{quote, ToTokens}; @@ -9,11 +8,10 @@ use syn::{ Signature, Stmt, Token, Type, TypePath, }; -pub(crate) fn r#impl(attr: TokenStream, item: TokenStream) -> TokenStream { - let args = parse_macro_input!(attr as MainArgs); +pub(crate) fn r#impl(_attr: TokenStream, item: TokenStream) -> TokenStream { let mut fn_decl = parse_macro_input!(item as ItemFn); - let loader = Loader::from_item_fn(&mut fn_decl, args); + let loader = Loader::from_item_fn(&mut fn_decl); quote! { #[tokio::main] @@ -32,7 +30,6 @@ struct Loader { fn_ident: Ident, fn_inputs: Vec, fn_return: TypePath, - fn_args: MainArgs, } #[derive(Debug, PartialEq)] @@ -87,7 +84,7 @@ impl Parse for BuilderOption { } impl Loader { - pub(crate) fn from_item_fn(item_fn: &mut ItemFn, args: MainArgs) -> Option { + pub(crate) fn from_item_fn(item_fn: &mut ItemFn) -> Option { // rename function to allow any name, such as 'main' item_fn.sig.ident = Ident::new( &format!("__shuttle_{}", item_fn.sig.ident), @@ -124,7 +121,6 @@ impl Loader { fn_ident: item_fn.sig.ident.clone(), fn_inputs: inputs, fn_return: type_path, - fn_args: args, }) } } @@ -256,16 +252,6 @@ impl ToTokens for Loader { (None, None) }; - let inject_tracing_layer = match self.fn_args.tracing_args { - None => quote! {}, - Some(ref args) => { - let layer_fn = &args.value; - quote! { - let registry = registry.with(#layer_fn()); - } - } - }; - let loader = quote! { async fn loader( mut #factory_ident: shuttle_runtime::ProvisionerFactory, @@ -274,49 +260,7 @@ impl ToTokens for Loader { deployment_id: String, ) -> #return_type { use shuttle_runtime::Context; - use shuttle_runtime::tracing_subscriber::prelude::*; - use shuttle_runtime::opentelemetry_otlp::WithExportConfig; #extra_imports - - let filter_layer = shuttle_runtime::tracing_subscriber::EnvFilter::try_from_default_env() - .or_else(|_| shuttle_runtime::tracing_subscriber::EnvFilter::try_new("info")) - .unwrap(); - - let tracer = shuttle_runtime::opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter( - shuttle_runtime::opentelemetry_otlp::new_exporter() - .tonic() - .with_endpoint(logger_uri), - ) - .with_trace_config( - shuttle_runtime::opentelemetry::sdk::trace::config() - .with_resource( - shuttle_runtime::opentelemetry::sdk::Resource::new( - vec![ - shuttle_runtime::opentelemetry::KeyValue::new( - "service.name", - "shuttle-runtime", - ), - shuttle_runtime::opentelemetry::KeyValue::new( - "deployment_id", - deployment_id, - ) - ] - ) - ), - ) - .install_batch(shuttle_runtime::opentelemetry::runtime::Tokio) - .unwrap(); - let otel_layer = shuttle_runtime::tracing_opentelemetry::layer().with_tracer(tracer); - - let registry = shuttle_runtime::tracing_subscriber::registry() - .with(filter_layer) - .with(otel_layer); - - #inject_tracing_layer - - registry.init(); #vars #(let #fn_inputs = shuttle_runtime::get_resource( #fn_inputs_builder::new()#fn_inputs_builder_options, @@ -335,64 +279,6 @@ impl ToTokens for Loader { } } -/// Configuration options specified by the user. -#[derive(Debug, Default)] -struct MainArgs { - tracing_args: Option, -} - -impl Parse for MainArgs { - fn parse(input: syn::parse::ParseStream) -> syn::Result { - // Start with empty arguments - let mut args = Self::default(); - - // If the user didn't pass any arguments, this loop is a no-op. - // Otherwise, any argument starts with some identifier. If we find one, we continue to - // parse the input according to the name of the identifier. - while let Ok(ident) = input.parse::() { - match ident.to_string().as_str() { - "tracing_layer" => { - let equal_sign = input.parse::()?; - - if equal_sign.as_char() != '=' { - emit_error!(ident, "must be followed by a `=`."); - } - - let value = input.parse()?; - - args.tracing_args = Some(TracingAttr { - _attr: ident, - _equal_sign: equal_sign, - value, - }); - } - - attr_ident => emit_error!(attr_ident, "Unknown attribute."), - }; - } - - Ok(args) - } -} - -/// An attribute to customize the tracing registry setup by shuttle -#[derive(Debug)] -struct TracingAttr { - _attr: Ident, - _equal_sign: Punct, - value: Path, -} - -impl Parse for TracingAttr { - fn parse(input: syn::parse::ParseStream) -> syn::Result { - Ok(Self { - _attr: input.parse()?, - _equal_sign: input.parse()?, - value: input.parse()?, - }) - } -} - #[cfg(test)] mod tests { use pretty_assertions::assert_eq; @@ -407,7 +293,7 @@ mod tests { async fn simple() -> ShuttleAxum {} ); - let actual = Loader::from_item_fn(&mut input, Default::default()).unwrap(); + let actual = Loader::from_item_fn(&mut input).unwrap(); let expected_ident: Ident = parse_quote!(__shuttle_simple); let expected_return: TypePath = parse_quote!(ShuttleAxum); @@ -422,7 +308,7 @@ mod tests { async fn main() -> ShuttleAxum {} ); - let actual = Loader::from_item_fn(&mut input, Default::default()).unwrap(); + let actual = Loader::from_item_fn(&mut input).unwrap(); let expected_ident: Ident = parse_quote!(__shuttle_main); assert_eq!(actual.fn_ident, expected_ident); @@ -434,7 +320,6 @@ mod tests { fn_ident: parse_quote!(simple), fn_inputs: Vec::new(), fn_return: parse_quote!(ShuttleSimple), - fn_args: Default::default(), }; let actual = quote!(#input); @@ -446,47 +331,6 @@ mod tests { deployment_id: String, ) -> ShuttleSimple { use shuttle_runtime::Context; - use shuttle_runtime::tracing_subscriber::prelude::*; - use shuttle_runtime::opentelemetry_otlp::WithExportConfig; - - let filter_layer = shuttle_runtime::tracing_subscriber::EnvFilter::try_from_default_env() - .or_else(|_| shuttle_runtime::tracing_subscriber::EnvFilter::try_new("info")) - .unwrap(); - - let tracer = shuttle_runtime::opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter( - shuttle_runtime::opentelemetry_otlp::new_exporter() - .tonic() - .with_endpoint(logger_uri), - ) - .with_trace_config( - shuttle_runtime::opentelemetry::sdk::trace::config() - .with_resource( - shuttle_runtime::opentelemetry::sdk::Resource::new( - vec![ - shuttle_runtime::opentelemetry::KeyValue::new( - "service.name", - "shuttle-runtime", - ), - shuttle_runtime::opentelemetry::KeyValue::new( - "deployment_id", - deployment_id, - ) - ] - ) - ), - ) - .install_batch(shuttle_runtime::opentelemetry::runtime::Tokio) - .unwrap(); - let otel_layer = shuttle_runtime::tracing_opentelemetry::layer().with_tracer(tracer); - - let registry = shuttle_runtime::tracing_subscriber::registry() - .with(filter_layer) - .with(otel_layer); - - registry.init(); - simple().await } }; @@ -500,7 +344,7 @@ mod tests { async fn complex(#[shuttle_shared_db::Postgres] pool: PgPool) -> ShuttleTide {} ); - let actual = Loader::from_item_fn(&mut input, Default::default()).unwrap(); + let actual = Loader::from_item_fn(&mut input).unwrap(); let expected_ident: Ident = parse_quote!(__shuttle_complex); let expected_inputs: Vec = vec![Input { ident: parse_quote!(pool), @@ -546,7 +390,6 @@ mod tests { }, ], fn_return: parse_quote!(ShuttleComplex), - fn_args: Default::default(), }; let actual = quote!(#input); @@ -558,48 +401,7 @@ mod tests { deployment_id: String, ) -> ShuttleComplex { use shuttle_runtime::Context; - use shuttle_runtime::tracing_subscriber::prelude::*; - use shuttle_runtime::opentelemetry_otlp::WithExportConfig; use shuttle_runtime::{Factory, ResourceBuilder}; - - let filter_layer = shuttle_runtime::tracing_subscriber::EnvFilter::try_from_default_env() - .or_else(|_| shuttle_runtime::tracing_subscriber::EnvFilter::try_new("info")) - .unwrap(); - - let tracer = shuttle_runtime::opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter( - shuttle_runtime::opentelemetry_otlp::new_exporter() - .tonic() - .with_endpoint(logger_uri), - ) - .with_trace_config( - shuttle_runtime::opentelemetry::sdk::trace::config() - .with_resource( - shuttle_runtime::opentelemetry::sdk::Resource::new( - vec![ - shuttle_runtime::opentelemetry::KeyValue::new( - "service.name", - "shuttle-runtime", - ), - shuttle_runtime::opentelemetry::KeyValue::new( - "deployment_id", - deployment_id, - ) - ] - ) - ), - ) - .install_batch(shuttle_runtime::opentelemetry::runtime::Tokio) - .unwrap(); - let otel_layer = shuttle_runtime::tracing_opentelemetry::layer().with_tracer(tracer); - - let registry = shuttle_runtime::tracing_subscriber::registry() - .with(filter_layer) - .with(otel_layer); - - registry.init(); - let pool = shuttle_runtime::get_resource( shuttle_shared_db::Postgres::new(), &mut factory, @@ -653,7 +455,7 @@ mod tests { } ); - let actual = Loader::from_item_fn(&mut input, Default::default()).unwrap(); + let actual = Loader::from_item_fn(&mut input).unwrap(); let expected_ident: Ident = parse_quote!(__shuttle_complex); let mut expected_inputs: Vec = vec![Input { ident: parse_quote!(pool), @@ -690,7 +492,6 @@ mod tests { }, }], fn_return: parse_quote!(ShuttleComplex), - fn_args: Default::default(), }; input.fn_inputs[0] @@ -713,48 +514,7 @@ mod tests { deployment_id: String, ) -> ShuttleComplex { use shuttle_runtime::Context; - use shuttle_runtime::tracing_subscriber::prelude::*; - use shuttle_runtime::opentelemetry_otlp::WithExportConfig; use shuttle_runtime::{Factory, ResourceBuilder}; - - let filter_layer = shuttle_runtime::tracing_subscriber::EnvFilter::try_from_default_env() - .or_else(|_| shuttle_runtime::tracing_subscriber::EnvFilter::try_new("info")) - .unwrap(); - - let tracer = shuttle_runtime::opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter( - shuttle_runtime::opentelemetry_otlp::new_exporter() - .tonic() - .with_endpoint(logger_uri), - ) - .with_trace_config( - shuttle_runtime::opentelemetry::sdk::trace::config() - .with_resource( - shuttle_runtime::opentelemetry::sdk::Resource::new( - vec![ - shuttle_runtime::opentelemetry::KeyValue::new( - "service.name", - "shuttle-runtime", - ), - shuttle_runtime::opentelemetry::KeyValue::new( - "deployment_id", - deployment_id, - ) - ] - ) - ), - ) - .install_batch(shuttle_runtime::opentelemetry::runtime::Tokio) - .unwrap(); - let otel_layer = shuttle_runtime::tracing_opentelemetry::layer().with_tracer(tracer); - - let registry = shuttle_runtime::tracing_subscriber::registry() - .with(filter_layer) - .with(otel_layer); - - registry.init(); - let vars = std::collections::HashMap::from_iter(factory.get_secrets().await?.into_iter().map(|(key, value)| (format!("secrets.{}", key), value))); let pool = shuttle_runtime::get_resource ( shuttle_shared_db::Postgres::new().size(&shuttle_runtime::strfmt("10Gb", &vars)?).public(false), diff --git a/codegen/tests/integration/custom_tracing_layer.rs b/codegen/tests/integration/custom_tracing_layer.rs deleted file mode 100644 index a3e6cc393..000000000 --- a/codegen/tests/integration/custom_tracing_layer.rs +++ /dev/null @@ -1,59 +0,0 @@ -use reqwest::Client; -use serde_json::Value as JsonValue; -use shuttle_common_tests::cargo_shuttle::cargo_shuttle_run; -use tokio::time::Duration; - -#[tokio::test] -async fn custom_tracing_layer() { - // Spin up the example - let path = "../examples/tracing/axum-logs-endpoint"; - let url = cargo_shuttle_run(path, false).await; - - // Prepare URLs - let get_url = format!("{url}/logs/3"); - let post_url1 = format!("{url}/message/hello"); - let post_url2 = format!("{url}/message/world"); - let post_url3 = format!("{url}/message/how%20are%20you%3F"); - - let client1 = Client::new(); - let client2 = client1.clone(); - - // Send the initial GET request - let get = tokio::spawn(async move { - client1 - .get(get_url) - .send() - .await - .unwrap() - .json::>() - .await - .unwrap() - }); - - // Wait for the request to send - tokio::time::sleep(Duration::from_millis(500)).await; - - // Send some messages - tokio::spawn(async move { - client2.post(post_url1).send().await.unwrap(); - client2.post(post_url2).send().await.unwrap(); - client2.post(post_url3).send().await.unwrap(); - }); - - // Receive messages and validate them - let result = get.await.unwrap(); - - assert_eq!(result.len(), 3); - assert_eq!( - result[0]["fields"]["message"].as_str().unwrap(), - "\"hello\"" - ); - assert_eq!( - result[1]["fields"]["message"].as_str().unwrap(), - "\"world\"" - ); - assert_eq!( - result[2]["fields"]["message"].as_str().unwrap(), - "\"how are you?\"" - ); -} diff --git a/codegen/tests/integration/main.rs b/codegen/tests/integration/main.rs deleted file mode 100644 index 1e0cd516a..000000000 --- a/codegen/tests/integration/main.rs +++ /dev/null @@ -1 +0,0 @@ -mod custom_tracing_layer; diff --git a/logger/Cargo.toml b/logger/Cargo.toml index 3901a60a9..b2e096795 100644 --- a/logger/Cargo.toml +++ b/logger/Cargo.toml @@ -26,7 +26,6 @@ tokio-stream = { workspace = true } tonic = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["default"] } -tracing-opentelemetry = { workspace = true } [dependencies.shuttle-common] workspace = true @@ -40,5 +39,3 @@ portpicker = { workspace = true } pretty_assertions = { workspace = true } serde_json = { workspace = true } shuttle-common-tests = { workspace = true } -opentelemetry-otlp = { workspace = true } -opentelemetry = { workspace = true } diff --git a/logger/migrations/0000_init.sql b/logger/migrations/0000_init.sql index 123488a95..cde6f37a8 100644 --- a/logger/migrations/0000_init.sql +++ b/logger/migrations/0000_init.sql @@ -1,7 +1,6 @@ CREATE TABLE IF NOT EXISTS logs ( deployment_id TEXT, -- The deployment that this log line pertains to. shuttle_service_name TEXT, -- The shuttle service which created this log. - timestamp INTEGER, -- Unix epoch timestamp. - level INTEGER, -- The log level - fields TEXT -- Log fields object. + tx_timestamp DATETIME, -- Unix epoch timestamp. + data BLOB -- Log fields object. ); diff --git a/logger/src/dal.rs b/logger/src/dal.rs index e41f7d1c4..9f6be407b 100644 --- a/logger/src/dal.rs +++ b/logger/src/dal.rs @@ -3,13 +3,8 @@ use std::{path::Path, str::FromStr, time::SystemTime}; use async_broadcast::{broadcast, Sender}; use async_trait::async_trait; use chrono::NaiveDateTime; -use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans, Span}; use prost_types::Timestamp; -use serde_json::Value; -use shuttle_common::{ - backends::tracing::from_any_value_kv_to_serde_json_map, tracing::MESSAGE_KEY, -}; -use shuttle_proto::logger::{self, LogItem}; +use shuttle_proto::logger::{LogItem, LogLine}; use sqlx::{ migrate::{MigrateDatabase, Migrator}, sqlite::{SqliteConnectOptions, SqliteJournalMode}, @@ -35,6 +30,7 @@ pub trait Dal { async fn get_logs(&self, deployment_id: String) -> Result, DalError>; } +#[derive(Clone)] pub struct Sqlite { pool: SqlitePool, tx: Sender>, @@ -85,13 +81,14 @@ impl Sqlite { tokio::spawn(async move { while let Ok(logs) = rx.recv().await { - let mut builder = QueryBuilder::new("INSERT INTO logs (deployment_id, shuttle_service_name, timestamp, level, fields)"); + let mut builder = QueryBuilder::new( + "INSERT INTO logs (deployment_id, shuttle_service_name, data, tx_timestamp)", + ); builder.push_values(logs, |mut b, log| { b.push_bind(log.deployment_id) .push_bind(log.shuttle_service_name) - .push_bind(log.timestamp) - .push_bind(log.level) - .push_bind(log.fields); + .push_bind(log.data) + .push_bind(log.tx_timestamp); }); let query = builder.build(); @@ -114,7 +111,7 @@ impl Sqlite { impl Dal for Sqlite { async fn get_logs(&self, deployment_id: String) -> Result, DalError> { let result = - sqlx::query_as("SELECT * FROM logs WHERE deployment_id = ? ORDER BY timestamp") + sqlx::query_as("SELECT * FROM logs WHERE deployment_id = ? ORDER BY tx_timestamp") .bind(deployment_id) .fetch_all(&self.pool) .await?; @@ -127,170 +124,49 @@ impl Dal for Sqlite { pub struct Log { pub(crate) deployment_id: String, pub(crate) shuttle_service_name: String, - pub(crate) timestamp: DateTime, - pub(crate) level: LogLevel, - pub(crate) fields: Value, -} - -#[derive(Clone, Debug, sqlx::Type)] -pub enum LogLevel { - Trace, - Debug, - Info, - Warn, - Error, -} - -impl FromStr for LogLevel { - type Err = DalError; - - fn from_str(value: &str) -> Result { - match value { - "TRACE" => Ok(Self::Trace), - "DEBUG" => Ok(Self::Debug), - "INFO" => Ok(Self::Info), - "WARN" => Ok(Self::Warn), - "ERROR" => Ok(Self::Error), - other => Err(DalError::Parsing(format!("invalid log level: {other}"))), - } - } -} - -impl From for logger::LogLevel { - fn from(level: LogLevel) -> Self { - match level { - LogLevel::Trace => Self::Trace, - LogLevel::Debug => Self::Debug, - LogLevel::Info => Self::Info, - LogLevel::Warn => Self::Warn, - LogLevel::Error => Self::Error, - } - } + pub(crate) tx_timestamp: DateTime, + pub(crate) data: Vec, } impl Log { - /// Try to get a log from an OTLP [ResourceSpans] - pub fn try_from_scope_span(resource_spans: ResourceSpans) -> Option> { - let ResourceSpans { - resource, - scope_spans, - schema_url: _, - } = resource_spans; - - let fields = from_any_value_kv_to_serde_json_map(resource?.attributes); - let shuttle_service_name = fields.get("service.name")?.as_str()?.to_string(); - // TODO: should this be named "deployment.id" to conform to otlp standard? - let deployment_id = fields - .get("deployment_id") - .map(|v| { - v.as_str() - .expect("expected to have a string value for deployment_id key") - }) - .map(|inner| inner.to_string()); - - let logs = scope_spans - .into_iter() - .flat_map(|scope_spans| { - let ScopeSpans { - spans, - schema_url: _, - .. - } = scope_spans; - - let events: Vec<_> = spans - .into_iter() - .flat_map(|span| { - Self::try_from_span(span, &shuttle_service_name, deployment_id.clone()) - }) - .flatten() - .collect(); - - Some(events) - }) - .flatten() - .collect(); - - Some(logs) - } - - /// Try to get self from an OTLP [Span]. Also enrich it with the shuttle service name and deployment id. - fn try_from_span( - span: Span, - shuttle_service_name: &str, - deployment_id: Option, - ) -> Option> { - // If we didn't find the id in the resource span, check the inner spans. - let mut span_fields = from_any_value_kv_to_serde_json_map(span.attributes); - let deployment_id = deployment_id.or(span_fields - .get("deployment_id")? - .as_str() - .map(|inner| inner.to_string()))?; - let mut logs: Vec = span - .events - .into_iter() - .flat_map(|event| { - let message = event.name; - - let mut fields = from_any_value_kv_to_serde_json_map(event.attributes); - fields.insert(MESSAGE_KEY.to_string(), message.into()); - - // Since we store the "level" in the level column in the database, we remove it - // from the event fields so it is not duplicated there. - // Note: this should never fail, a tracing event should always have a level. - let level = fields.remove("level")?; - - let naive = NaiveDateTime::from_timestamp_opt( - (event.time_unix_nano / 1_000_000_000) - .try_into() - .unwrap_or_default(), - (event.time_unix_nano % 1_000_000_000) as u32, - ) - .unwrap_or_default(); - - Some(Log { - shuttle_service_name: shuttle_service_name.to_string(), - deployment_id: deployment_id.clone(), - timestamp: DateTime::from_utc(naive, Utc), - level: level.as_str()?.parse().ok()?, - fields: Value::Object(fields), - }) - }) - .collect(); - - span_fields.insert( - MESSAGE_KEY.to_string(), - format!("[span] {}", span.name).into(), - ); - - logs.push(Log { - shuttle_service_name: shuttle_service_name.to_string(), - deployment_id, - timestamp: DateTime::from_utc( + pub(crate) fn from_log_item(log: LogItem) -> Option { + let log_line = log.log_line?; + let timestamp = log_line.tx_timestamp.clone().unwrap_or_default(); + Some(Log { + deployment_id: log.deployment_id, + shuttle_service_name: log_line.service_name, + tx_timestamp: DateTime::from_utc( NaiveDateTime::from_timestamp_opt( - (span.start_time_unix_nano / 1_000_000_000) - .try_into() - .unwrap_or_default(), - (span.start_time_unix_nano % 1_000_000_000) as u32, + timestamp.seconds, + timestamp.nanos.try_into().unwrap_or_default(), ) .unwrap_or_default(), Utc, ), - // Span level doesn't exist for opentelemetry spans, so this info is not relevant. - level: LogLevel::Info, - fields: Value::Object(span_fields), - }); - - Some(logs) + data: log_line.data, + }) } } impl From for LogItem { fn from(log: Log) -> Self { LogItem { + deployment_id: log.deployment_id, + log_line: Some(LogLine { + service_name: log.shuttle_service_name, + tx_timestamp: Some(Timestamp::from(SystemTime::from(log.tx_timestamp))), + data: log.data, + }), + } + } +} + +impl From for LogLine { + fn from(log: Log) -> Self { + LogLine { service_name: log.shuttle_service_name, - timestamp: Some(Timestamp::from(SystemTime::from(log.timestamp))), - level: logger::LogLevel::from(log.level) as i32, - fields: serde_json::to_vec(&log.fields).unwrap_or_default(), + tx_timestamp: Some(Timestamp::from(SystemTime::from(log.tx_timestamp))), + data: log.data, } } } diff --git a/logger/src/lib.rs b/logger/src/lib.rs index fc7237e3e..b662822c6 100644 --- a/logger/src/lib.rs +++ b/logger/src/lib.rs @@ -1,11 +1,12 @@ use async_broadcast::Sender; use async_trait::async_trait; -use dal::{Dal, DalError, Log}; -use opentelemetry_proto::tonic::collector::trace::v1::{ - trace_service_server::TraceService, ExportTraceServiceRequest, ExportTraceServiceResponse, -}; +use dal::Log; +use dal::{Dal, DalError}; use shuttle_common::{backends::auth::VerifyClaim, claims::Scope}; -use shuttle_proto::logger::{logger_server::Logger, LogItem, LogsRequest, LogsResponse}; +use shuttle_proto::logger::LogLine; +use shuttle_proto::logger::{ + logger_server::Logger, LogsRequest, LogsResponse, StoreLogsRequest, StoreLogsResponse, +}; use thiserror::Error; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; @@ -15,6 +16,7 @@ pub mod args; mod dal; pub use dal::Sqlite; +use tracing::error; /// A wrapper to capture any error possible with this service #[derive(Error, Debug)] @@ -29,44 +31,6 @@ impl From for Status { } } -pub struct ShuttleLogsOtlp { - tx: Sender>, -} - -impl ShuttleLogsOtlp { - pub fn new(tx: Sender>) -> Self { - Self { tx } - } -} - -#[async_trait] -impl TraceService for ShuttleLogsOtlp { - async fn export( - &self, - request: Request, - ) -> std::result::Result, tonic::Status> { - let request = request.into_inner(); - - let logs: Vec<_> = request - .resource_spans - .into_iter() - .flat_map(Log::try_from_scope_span) - .flatten() - .collect(); - - // TODO: consider returning different response for this case. - if !logs.is_empty() { - _ = self.tx.broadcast(logs).await.map_err(|err| { - println!("failed to send log to storage: {}", err); - }); - } - - Ok(Response::new(ExportTraceServiceResponse { - partial_success: None, - })) - } -} - pub struct Service { dal: D, logs_tx: Sender>, @@ -76,14 +40,11 @@ impl Service where D: Dal + Send + Sync + 'static, { - pub fn new(logs_rx: Sender>, dal: D) -> Self { - Self { - dal, - logs_tx: logs_rx, - } + pub fn new(logs_tx: Sender>, dal: D) -> Self { + Self { dal, logs_tx } } - async fn get_logs(&self, deployment_id: String) -> Result, Error> { + async fn get_logs(&self, deployment_id: String) -> Result, Error> { let logs = self.dal.get_logs(deployment_id).await?; Ok(logs.into_iter().map(Into::into).collect()) @@ -95,6 +56,27 @@ impl Logger for Service where D: Dal + Send + Sync + 'static, { + async fn store_logs( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let logs = request.logs; + if !logs.is_empty() { + _ = self + .logs_tx + .broadcast(logs.into_iter().filter_map(Log::from_log_item).collect()) + .await + .map_err(|err| { + Status::internal(format!( + "Errored while trying to store the logs in persistence: {err}" + )) + })?; + } + + Ok(Response::new(StoreLogsResponse { success: true })) + } + async fn get_logs( &self, request: Request, @@ -108,7 +90,7 @@ where Ok(Response::new(result)) } - type GetLogsStreamStream = ReceiverStream>; + type GetLogsStreamStream = ReceiverStream>; async fn get_logs_stream( &self, @@ -126,7 +108,7 @@ where let mut last = Default::default(); for log in logs { - last = log.timestamp.clone().unwrap_or_default(); + last = log.tx_timestamp.clone().unwrap_or_default(); if let Err(error) = tx.send(Ok(log)).await { println!("error sending log: {}", error); }; @@ -134,11 +116,12 @@ where while let Ok(logs) = logs_rx.recv().await { for log in logs { - let log: LogItem = log.into(); - let this_time = log.timestamp.clone().unwrap_or_default(); - - if this_time.seconds >= last.seconds && this_time.nanos > last.nanos { - tx.send(Ok(log)).await.unwrap(); + if log.tx_timestamp.timestamp() >= last.seconds + && log.tx_timestamp.timestamp_nanos() > last.nanos.into() + { + tx.send(Ok(log.into())).await.unwrap_or_else(|_| { + error!("Errored while sending logs to persistence") + }); } } } diff --git a/logger/src/main.rs b/logger/src/main.rs index d42535f39..b5b4d3907 100644 --- a/logger/src/main.rs +++ b/logger/src/main.rs @@ -1,12 +1,11 @@ use std::time::Duration; use clap::Parser; -use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer; use shuttle_common::backends::{ auth::{AuthPublicKey, JwtAuthenticationLayer}, tracing::{setup_tracing, ExtractPropagationLayer}, }; -use shuttle_logger::{args::Args, Service, ShuttleLogsOtlp, Sqlite}; +use shuttle_logger::{args::Args, Service, Sqlite}; use shuttle_proto::logger::logger_server::LoggerServer; use tonic::transport::Server; use tracing::trace; @@ -29,11 +28,8 @@ async fn main() { .layer(ExtractPropagationLayer); let sqlite = Sqlite::new(&db_path.display().to_string()).await; - let svc = ShuttleLogsOtlp::new(sqlite.get_sender()); - let trace_svc = TraceServiceServer::new(svc); - let router = server_builder - .add_service(trace_svc) - .add_service(LoggerServer::new(Service::new(sqlite.get_sender(), sqlite))); + let router = + server_builder.add_service(LoggerServer::new(Service::new(sqlite.get_sender(), sqlite))); router.serve(args.address).await.unwrap(); } diff --git a/logger/tests/integration_tests.rs b/logger/tests/integration_tests.rs index ec429955d..7bf50e65e 100644 --- a/logger/tests/integration_tests.rs +++ b/logger/tests/integration_tests.rs @@ -1,465 +1,172 @@ -use std::net::{Ipv4Addr, SocketAddr}; +use std::{ + net::{Ipv4Addr, SocketAddr}, + time::{Duration, SystemTime}, +}; -use opentelemetry::KeyValue; -use opentelemetry_otlp::WithExportConfig; -use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer; use portpicker::pick_unused_port; use pretty_assertions::assert_eq; -use serde_json::{json, Value}; -use shuttle_common::{ - claims::Scope, - tracing::{FILEPATH_KEY, LINENO_KEY, MESSAGE_KEY, NAMESPACE_KEY, TARGET_KEY}, -}; +use prost_types::Timestamp; +use shuttle_common::claims::Scope; use shuttle_common_tests::JwtScopesLayer; -use shuttle_logger::{Service, ShuttleLogsOtlp, Sqlite}; +use shuttle_logger::{Service, Sqlite}; use shuttle_proto::logger::{ - logger_client::LoggerClient, logger_server::LoggerServer, LogItem, LogLevel, LogsRequest, + logger_client::LoggerClient, logger_server::LoggerServer, LogItem, LogLine, LogsRequest, + StoreLogsRequest, }; -use tokio::time::timeout; +use tokio::{task::JoinHandle, time::timeout}; use tonic::{transport::Server, Request}; -use tracing::{debug, error, info, instrument, trace, warn}; -use tracing_subscriber::prelude::*; -// TODO: find out why these tests affect one-another. If running them together setting the timeouts -// low will cause them to fail spuriously. If running single tests they always pass. +const SHUTTLE_SERVICE: &str = "test"; + #[tokio::test] -async fn generate_and_get_runtime_logs() { +async fn store_and_get_logs() { let port = pick_unused_port().unwrap(); - let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), port); - const DEPLOYMENT_ID: &str = "runtime-fetch-logs-deployment-id"; - - // Start the logger server in the background. - tokio::task::spawn(async move { - let sqlite = Sqlite::new_in_memory().await; - - Server::builder() - .layer(JwtScopesLayer::new(vec![Scope::Logs])) - .add_service(TraceServiceServer::new(ShuttleLogsOtlp::new( - sqlite.get_sender(), - ))) - .add_service(LoggerServer::new(Service::new(sqlite.get_sender(), sqlite))) - .serve(addr) + let deployment_id = "runtime-fetch-logs-deployment-id"; + + let server = get_server(port); + let test = tokio::task::spawn(async move { + let dst = format!("http://localhost:{port}"); + let mut client = LoggerClient::connect(dst).await.unwrap(); + + // Get the generated logs + let expected_stored_logs = vec![ + LogItem { + deployment_id: deployment_id.to_string(), + log_line: Some(LogLine { + service_name: SHUTTLE_SERVICE.to_string(), + tx_timestamp: Some(Timestamp::from(SystemTime::UNIX_EPOCH)), + data: "log 1 example".as_bytes().to_vec(), + }), + }, + LogItem { + deployment_id: deployment_id.to_string(), + log_line: Some(LogLine { + service_name: SHUTTLE_SERVICE.to_string(), + tx_timestamp: Some(Timestamp::from( + SystemTime::UNIX_EPOCH + .checked_add(Duration::from_secs(10)) + .unwrap(), + )), + data: "log 2 example".as_bytes().to_vec(), + }), + }, + ]; + let response = client + .store_logs(Request::new(StoreLogsRequest { + logs: expected_stored_logs.clone(), + })) + .await + .unwrap() + .into_inner(); + assert!(response.success); + + // Get logs + let logs = client + .get_logs(Request::new(LogsRequest { + deployment_id: deployment_id.into(), + })) .await .unwrap() + .into_inner() + .log_items; + assert_eq!( + logs, + expected_stored_logs + .into_iter() + .map(|log| log.log_line.unwrap()) + .collect::>() + ); }); - // Ensure the logger server has time to start. - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - // Start a subscriber and generate some logs. - generate_runtime_logs(port, DEPLOYMENT_ID.into(), deploy); - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - let dst = format!("http://localhost:{port}"); - - let mut client = LoggerClient::connect(dst).await.unwrap(); - - // Get the generated logs - let response = client - .get_logs(Request::new(LogsRequest { - deployment_id: DEPLOYMENT_ID.into(), - })) - .await - .unwrap() - .into_inner(); - - let quoted_deployment_id = format!("\"{DEPLOYMENT_ID}\""); - let expected = vec![ - MinLogItem { - level: LogLevel::Info, - fields: json!({"message": "[span] deploy", "deployment_id": quoted_deployment_id }), - }, - MinLogItem { - level: LogLevel::Error, - fields: json!({"message": "error"}), - }, - MinLogItem { - level: LogLevel::Warn, - fields: json!({"message": "warn"}), - }, - MinLogItem { - level: LogLevel::Info, - fields: json!({"message": "info", "deployment_id": DEPLOYMENT_ID.to_string()}), - }, - MinLogItem { - level: LogLevel::Debug, - fields: json!({"message": "debug"}), - }, - MinLogItem { - level: LogLevel::Trace, - fields: json!({"message": "trace"}), - }, - MinLogItem { - level: LogLevel::Info, - fields: json!({"message": "[span] span_name1", "deployment_id": quoted_deployment_id }), - }, - MinLogItem { - level: LogLevel::Trace, - fields: json!({"message": "inside span 1 event"}), - }, - ]; - - assert_eq!( - response - .log_items - .into_iter() - .map(MinLogItem::from) - .collect::>(), - expected - ); - - // Generate some logs with a fn not instrumented with deployment_id, and the - // ID not added to the tracer attributes. - generate_service_logs(port, DEPLOYMENT_ID.into(), deploy); - - let response = client - .get_logs(Request::new(LogsRequest { - deployment_id: DEPLOYMENT_ID.into(), - })) - .await - .unwrap() - .into_inner(); - - // Check that no more logs have been recorded. - assert_eq!( - response - .log_items - .into_iter() - .map(MinLogItem::from) - .collect::>(), - expected - ); + tokio::select! { + _ = server => panic!("server stopped first"), + _ = test => () + } } #[tokio::test] -async fn generate_and_get_service_logs() { +async fn get_stream_logs() { let port = pick_unused_port().unwrap(); - let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), port); - const DEPLOYMENT_ID: &str = "service-fetch-logs-deployment-id"; + let deployment_id = "runtime-fetch-logs-deployment-id"; // Start the logger server in the background. - tokio::task::spawn(async move { - let sqlite = Sqlite::new_in_memory().await; - - Server::builder() - .layer(JwtScopesLayer::new(vec![Scope::Logs])) - .add_service(TraceServiceServer::new(ShuttleLogsOtlp::new( - sqlite.get_sender(), - ))) - .add_service(LoggerServer::new(Service::new(sqlite.get_sender(), sqlite))) - .serve(addr) + let server = get_server(port); + let test = tokio::task::spawn(async move { + let dst = format!("http://localhost:{port}"); + let mut client = LoggerClient::connect(dst).await.unwrap(); + + // Get the generated logs + let expected_stored_logs = vec![ + LogItem { + deployment_id: deployment_id.to_string(), + log_line: Some(LogLine { + service_name: SHUTTLE_SERVICE.to_string(), + tx_timestamp: Some(Timestamp::from(SystemTime::UNIX_EPOCH)), + data: "log 1 example".as_bytes().to_vec(), + }), + }, + LogItem { + deployment_id: deployment_id.to_string(), + log_line: Some(LogLine { + service_name: SHUTTLE_SERVICE.to_string(), + tx_timestamp: Some(Timestamp::from( + SystemTime::UNIX_EPOCH + .checked_add(Duration::from_secs(10)) + .unwrap(), + )), + data: "log 2 example".as_bytes().to_vec(), + }), + }, + ]; + + let response = client + .store_logs(Request::new(StoreLogsRequest { + logs: expected_stored_logs.clone(), + })) .await .unwrap() - }); - - // Ensure the logger server has time to start. - // TODO: find out why setting this lower causes spurious failures of these tests. - tokio::time::sleep(std::time::Duration::from_millis(500)).await; - - // Start a subscriber and generate some logs using an instrumented deploy function. - generate_service_logs(port, DEPLOYMENT_ID.into(), deploy_instrumented); - tokio::time::sleep(std::time::Duration::from_millis(1000)).await; - - let dst = format!("http://localhost:{port}"); - - let mut client = LoggerClient::connect(dst).await.unwrap(); - - // Get the generated logs - let response = client - .get_logs(Request::new(LogsRequest { - deployment_id: DEPLOYMENT_ID.into(), - })) - .await - .unwrap() - .into_inner(); - - let expected = vec![ - MinLogItem { - level: LogLevel::Info, - fields: json!({"message": "[span] deploy_instrumented", "deployment_id": DEPLOYMENT_ID.to_string() }), - }, - MinLogItem { - level: LogLevel::Error, - fields: json!({"message": "error"}), - }, - MinLogItem { - level: LogLevel::Warn, - fields: json!({"message": "warn"}), - }, - MinLogItem { - level: LogLevel::Info, - fields: json!({"message": "info", "deployment_id": DEPLOYMENT_ID.to_string()}), - }, - MinLogItem { - level: LogLevel::Debug, - fields: json!({"message": "debug"}), - }, - MinLogItem { - level: LogLevel::Trace, - fields: json!({"message": "trace"}), - }, - ]; - - assert_eq!( - response - .log_items - .into_iter() - .map(MinLogItem::from) - .collect::>(), - expected - ); + .into_inner(); + assert!(response.success); + + // Subscribe to stream + let mut response = client + .get_logs_stream(Request::new(LogsRequest { + deployment_id: deployment_id.into(), + })) + .await + .unwrap() + .into_inner(); - // Generate some logs with a fn not instrumented with deployment_id. - generate_service_logs(port, DEPLOYMENT_ID.into(), deploy); + let log = timeout(std::time::Duration::from_millis(500), response.message()) + .await + .unwrap() + .unwrap() + .unwrap(); + assert_eq!(expected_stored_logs[0].clone().log_line.unwrap(), log); - let response = client - .get_logs(Request::new(LogsRequest { - deployment_id: DEPLOYMENT_ID.into(), - })) - .await - .unwrap() - .into_inner(); + let log = timeout(std::time::Duration::from_millis(500), response.message()) + .await + .unwrap() + .unwrap() + .unwrap(); + assert_eq!(expected_stored_logs[1].clone().log_line.unwrap(), log); + }); - // Check that no more logs have been recorded. - assert_eq!( - response - .log_items - .into_iter() - .map(MinLogItem::from) - .collect::>(), - expected - ); + tokio::select! { + _ = server => panic!("server stopped first"), + _ = test => () + } } -#[tokio::test] -async fn generate_and_stream_logs() { - let port = pick_unused_port().unwrap(); +fn get_server(port: u16) -> JoinHandle<()> { let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), port); - const DEPLOYMENT_ID: &str = "stream-logs-deployment-id"; - - // Start the logger server in the background. - tokio::spawn(async move { + tokio::task::spawn(async move { let sqlite = Sqlite::new_in_memory().await; - Server::builder() .layer(JwtScopesLayer::new(vec![Scope::Logs])) - .add_service(TraceServiceServer::new(ShuttleLogsOtlp::new( - sqlite.get_sender(), - ))) .add_service(LoggerServer::new(Service::new(sqlite.get_sender(), sqlite))) .serve(addr) .await .unwrap() - }); - - // Ensure the server has started. - tokio::time::sleep(std::time::Duration::from_millis(300)).await; - - // Start a subscriber and generate some logs. - generate_runtime_logs(port, DEPLOYMENT_ID.into(), span_name1); - - // Connect to the logger server so we can fetch logs. - let dst = format!("http://localhost:{port}"); - let mut client = LoggerClient::connect(dst).await.unwrap(); - - // Subscribe to stream - let mut response = client - .get_logs_stream(Request::new(LogsRequest { - deployment_id: DEPLOYMENT_ID.into(), - })) - .await - .unwrap() - .into_inner(); - - let log = timeout(std::time::Duration::from_millis(500), response.message()) - .await - .unwrap() - .unwrap() - .unwrap(); - - let quoted_deployment_id = format!("\"{DEPLOYMENT_ID}\""); - assert_eq!( - MinLogItem::from(log), - MinLogItem { - level: LogLevel::Info, - fields: json!({"message": "[span] span_name1", "deployment_id": quoted_deployment_id}), - }, - ); - - let log = timeout(std::time::Duration::from_millis(500), response.message()) - .await - .unwrap() - .unwrap() - .unwrap(); - assert_eq!( - MinLogItem::from(log), - MinLogItem { - level: LogLevel::Trace, - fields: json!({"message": "inside span 1 event"}), - }, - ); - - // Start a subscriber and generate some more logs. - generate_runtime_logs(port, DEPLOYMENT_ID.into(), span_name2); - - let log = timeout(std::time::Duration::from_millis(500), response.message()) - .await - .unwrap() - .unwrap() - .unwrap(); - assert_eq!( - MinLogItem::from(log), - MinLogItem { - level: LogLevel::Trace, - fields: json!({"message": "inside span 2 event"}), - }, - ); - - let log = timeout(std::time::Duration::from_millis(500), response.message()) - .await - .unwrap() - .unwrap() - .unwrap(); - assert_eq!( - MinLogItem::from(log), - MinLogItem { - level: LogLevel::Info, - fields: json!({"message": "[span] span_name2", "deployment_id": quoted_deployment_id}), - }, - ); -} - -/// For the service logs the deployment id will be retrieved from the spans of functions -/// instrumented with the deployment_id field, this way we can choose which spans we want -/// to associate with a deployment and record in the logger. -fn generate_service_logs(port: u16, deployment_id: String, generator: fn(String)) { - generate_logs( - port, - deployment_id, - generator, - vec![KeyValue::new("service.name", "test")], - ); -} - -/// For the shuttle-runtime logs we want to add the deployment id to the top level attributes, -/// this way we can associate any logs coming from a runtime with a deployment. -fn generate_runtime_logs(port: u16, deployment_id: String, generator: fn(String)) { - generate_logs( - port, - deployment_id.clone(), - generator, - vec![ - KeyValue::new("service.name", "test"), - KeyValue::new("deployment_id", deployment_id), - ], - ); -} - -/// Helper function to setup a tracing subscriber and run an instrumented fn to produce logs. -fn generate_logs( - port: u16, - deployment_id: String, - generator: fn(String), - resources: Vec, -) { - // Set up tracing subscriber connected to the logger server. - let tracer = opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter( - opentelemetry_otlp::new_exporter() - .tonic() - .with_endpoint(format!("http://127.0.0.1:{port}")), - ) - .with_trace_config( - opentelemetry::sdk::trace::config() - .with_resource(opentelemetry::sdk::Resource::new(resources)), - ) - .install_simple() - .unwrap(); - let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer); - - let _guard = tracing_subscriber::registry() - .with(otel_layer) - .set_default(); - - // Generate some logs. - generator(deployment_id); -} - -// deployment_id attribute not set. -#[instrument] -fn deploy(deployment_id: String) { - error!("error"); - warn!("warn"); - info!(%deployment_id, "info"); - debug!("debug"); - trace!("trace"); - // This tests that we handle nested spans. - span_name1(deployment_id); -} - -#[instrument(fields(%deployment_id))] -fn deploy_instrumented(deployment_id: String) { - error!("error"); - warn!("warn"); - info!(%deployment_id, "info"); - debug!("debug"); - trace!("trace"); -} - -#[instrument] -fn span_name1(deployment_id: String) { - trace!("inside span 1 event"); -} - -#[instrument] -fn span_name2(deployment_id: String) { - trace!("inside span 2 event"); -} - -#[derive(Debug, Eq, PartialEq)] -struct MinLogItem { - level: LogLevel, - fields: Value, -} - -impl From for MinLogItem { - fn from(log: LogItem) -> Self { - assert_eq!(log.service_name, "test"); - - let fields = if log.fields.is_empty() { - Value::Null - } else { - let mut fields: Value = serde_json::from_slice(&log.fields).unwrap(); - - let map = fields.as_object_mut().unwrap(); - - let message = map.get(MESSAGE_KEY).unwrap(); - // Span logs don't contain a target field - if !message.as_str().unwrap().starts_with("[span] ") { - let target = map.remove(TARGET_KEY).unwrap(); - assert_eq!(target, "integration_tests"); - } else { - // We want to remove what's not of interest for checking - // the spans are containing the right information. - let _ = map.remove("busy_ns").unwrap(); - let _ = map.remove("idle_ns").unwrap(); - let _ = map.remove("thread.id").unwrap(); - let _ = map.remove("thread.name").unwrap(); - } - - let filepath = map.remove(FILEPATH_KEY).unwrap(); - assert_eq!(filepath, "logger/tests/integration_tests.rs"); - - map.remove(LINENO_KEY).unwrap(); - map.remove(NAMESPACE_KEY).unwrap(); - - fields - }; - - Self { - level: log.level(), - fields, - } - } + }) } diff --git a/proto/logger.proto b/proto/logger.proto index f9857b72c..6398dc7ba 100644 --- a/proto/logger.proto +++ b/proto/logger.proto @@ -4,11 +4,22 @@ package logger; import "google/protobuf/timestamp.proto"; service Logger { + // Store logs + rpc StoreLogs(StoreLogsRequest) returns (StoreLogsResponse); + // Get stored logs rpc GetLogs(LogsRequest) returns (LogsResponse); // Get fresh logs as they are incoming - rpc GetLogsStream(LogsRequest) returns (stream LogItem); + rpc GetLogsStream(LogsRequest) returns (stream LogLine); +} + +message StoreLogsRequest { + repeated LogItem logs = 1; +} + +message StoreLogsResponse { + bool success = 1; } message LogsRequest { @@ -16,20 +27,16 @@ message LogsRequest { } message LogsResponse { - repeated LogItem log_items = 1; + repeated LogLine log_items = 1; } message LogItem { - string service_name = 1; - google.protobuf.Timestamp timestamp = 2; - LogLevel level = 3; - bytes fields = 4; + string deployment_id = 1; + LogLine log_line = 2; } -enum LogLevel { - Trace = 0; - Debug = 1; - Info = 2; - Warn = 3; - Error = 4; +message LogLine { + string service_name = 1; + google.protobuf.Timestamp tx_timestamp = 2; + bytes data = 3; } diff --git a/proto/src/generated/logger.rs b/proto/src/generated/logger.rs index 4da60775a..9ccc99a87 100644 --- a/proto/src/generated/logger.rs +++ b/proto/src/generated/logger.rs @@ -1,5 +1,17 @@ #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct StoreLogsRequest { + #[prost(message, repeated, tag = "1")] + pub logs: ::prost::alloc::vec::Vec, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct StoreLogsResponse { + #[prost(bool, tag = "1")] + pub success: bool, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct LogsRequest { #[prost(string, tag = "1")] pub deployment_id: ::prost::alloc::string::String, @@ -8,54 +20,25 @@ pub struct LogsRequest { #[derive(Clone, PartialEq, ::prost::Message)] pub struct LogsResponse { #[prost(message, repeated, tag = "1")] - pub log_items: ::prost::alloc::vec::Vec, + pub log_items: ::prost::alloc::vec::Vec, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct LogItem { #[prost(string, tag = "1")] - pub service_name: ::prost::alloc::string::String, + pub deployment_id: ::prost::alloc::string::String, #[prost(message, optional, tag = "2")] - pub timestamp: ::core::option::Option<::prost_types::Timestamp>, - #[prost(enumeration = "LogLevel", tag = "3")] - pub level: i32, - #[prost(bytes = "vec", tag = "4")] - pub fields: ::prost::alloc::vec::Vec, -} -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] -#[repr(i32)] -pub enum LogLevel { - Trace = 0, - Debug = 1, - Info = 2, - Warn = 3, - Error = 4, + pub log_line: ::core::option::Option, } -impl LogLevel { - /// String value of the enum field names used in the ProtoBuf definition. - /// - /// The values are not transformed in any way and thus are considered stable - /// (if the ProtoBuf definition does not change) and safe for programmatic use. - pub fn as_str_name(&self) -> &'static str { - match self { - LogLevel::Trace => "Trace", - LogLevel::Debug => "Debug", - LogLevel::Info => "Info", - LogLevel::Warn => "Warn", - LogLevel::Error => "Error", - } - } - /// Creates an enum from field names used in the ProtoBuf definition. - pub fn from_str_name(value: &str) -> ::core::option::Option { - match value { - "Trace" => Some(Self::Trace), - "Debug" => Some(Self::Debug), - "Info" => Some(Self::Info), - "Warn" => Some(Self::Warn), - "Error" => Some(Self::Error), - _ => None, - } - } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct LogLine { + #[prost(string, tag = "1")] + pub service_name: ::prost::alloc::string::String, + #[prost(message, optional, tag = "2")] + pub tx_timestamp: ::core::option::Option<::prost_types::Timestamp>, + #[prost(bytes = "vec", tag = "3")] + pub data: ::prost::alloc::vec::Vec, } /// Generated client implementations. pub mod logger_client { @@ -126,6 +109,24 @@ pub mod logger_client { self.inner = self.inner.accept_compressed(encoding); self } + /// Store logs + pub async fn store_logs( + &mut self, + request: impl tonic::IntoRequest, + ) -> Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/logger.Logger/StoreLogs"); + self.inner.unary(request.into_request(), path, codec).await + } /// Get stored logs pub async fn get_logs( &mut self, @@ -149,7 +150,7 @@ pub mod logger_client { &mut self, request: impl tonic::IntoRequest, ) -> Result< - tonic::Response>, + tonic::Response>, tonic::Status, > { self.inner @@ -176,6 +177,11 @@ pub mod logger_server { /// Generated trait containing gRPC methods that should be implemented for use with LoggerServer. #[async_trait] pub trait Logger: Send + Sync + 'static { + /// Store logs + async fn store_logs( + &self, + request: tonic::Request, + ) -> Result, tonic::Status>; /// Get stored logs async fn get_logs( &self, @@ -183,7 +189,7 @@ pub mod logger_server { ) -> Result, tonic::Status>; /// Server streaming response type for the GetLogsStream method. type GetLogsStreamStream: futures_core::Stream< - Item = Result, + Item = Result, > + Send + 'static; @@ -252,6 +258,42 @@ pub mod logger_server { fn call(&mut self, req: http::Request) -> Self::Future { let inner = self.inner.clone(); match req.uri().path() { + "/logger.Logger/StoreLogs" => { + #[allow(non_camel_case_types)] + struct StoreLogsSvc(pub Arc); + impl tonic::server::UnaryService + for StoreLogsSvc { + type Response = super::StoreLogsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = self.0.clone(); + let fut = async move { (*inner).store_logs(request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = StoreLogsSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/logger.Logger/GetLogs" => { #[allow(non_camel_case_types)] struct GetLogsSvc(pub Arc); @@ -295,7 +337,7 @@ pub mod logger_server { T: Logger, > tonic::server::ServerStreamingService for GetLogsStreamSvc { - type Response = super::LogItem; + type Response = super::LogLine; type ResponseStream = T::GetLogsStreamStream; type Future = BoxFuture< tonic::Response, diff --git a/proto/src/lib.rs b/proto/src/lib.rs index 69ebfc0ca..04e371ebe 100644 --- a/proto/src/lib.rs +++ b/proto/src/lib.rs @@ -231,71 +231,5 @@ pub mod resource_recorder { } pub mod logger { - use chrono::{DateTime, NaiveDateTime, Utc}; - use shuttle_common::tracing::{FILEPATH_KEY, LINENO_KEY, TARGET_KEY}; - use tracing::error; - include!("generated/logger.rs"); - - impl From for shuttle_common::log::Level { - fn from(level: LogLevel) -> Self { - match level { - LogLevel::Trace => Self::Trace, - LogLevel::Debug => Self::Debug, - LogLevel::Info => Self::Info, - LogLevel::Warn => Self::Warn, - LogLevel::Error => Self::Error, - } - } - } - - impl From for shuttle_common::LogItem { - fn from(value: LogItem) -> Self { - let proto_timestamp = value.timestamp.clone().unwrap_or_default(); - let level = value.level(); - let mut fields: serde_json::Map = - match serde_json::from_slice(&value.fields) { - Ok(serde_json::Value::Object(o)) => o, - Ok(_) => { - error!("unexpected JSON value, expected an object"); - serde_json::Map::new() - } - Err(err) => { - error!("malformed fields object: {err}"); - serde_json::Map::new() - } - }; - - // Safe to unwrap since we've previously serialised the fields we're removing below. - let file = fields - .remove(FILEPATH_KEY) - .map(|v| v.as_str().unwrap_or_default().to_string()); - let line = fields - .remove(LINENO_KEY) - .map(|v| u32::try_from(v.as_u64().unwrap_or_default()).unwrap_or_default()); - let target = fields - .remove(TARGET_KEY) - .map(|v| v.as_str().unwrap_or_default().to_string()) - .unwrap_or_default(); - - Self { - id: Default::default(), - timestamp: DateTime::from_utc( - NaiveDateTime::from_timestamp_opt( - proto_timestamp.seconds, - proto_timestamp.nanos.try_into().unwrap_or_default(), - ) - .unwrap_or_default(), - Utc, - ), - // TODO: update this to the corresponding state shown in the runtime log, when present - state: shuttle_common::deployment::State::Running, - level: level.into(), - file, - line, - target, - fields: serde_json::to_vec(&fields).unwrap_or_default(), - } - } - } } diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index fa40f29f0..729028029 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -16,8 +16,6 @@ doctest = false anyhow = { workspace = true } async-trait = { workspace = true } chrono = { workspace = true } -opentelemetry = { workspace = true } -opentelemetry-otlp = { workspace = true } prost-types = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } @@ -28,12 +26,6 @@ tokio-stream = "0.1.11" tonic = { workspace = true } tower = { workspace = true } tracing = { workspace = true, features = ["default"] } -tracing-opentelemetry = { workspace = true } -tracing-subscriber = { workspace = true, features = [ - "default", - "env-filter", - "fmt", -] } cap-std = { workspace = true, optional = true } futures = { workspace = true, optional = true } hyper = { workspace = true, optional = true } @@ -48,7 +40,7 @@ features = ["frameworks"] [dependencies.shuttle-common] workspace = true -features = ["claims", "backend"] +features = ["claims"] [dependencies.shuttle-proto] workspace = true @@ -58,7 +50,6 @@ workspace = true [dev-dependencies] crossbeam-channel = { workspace = true } -opentelemetry-proto = { workspace = true } portpicker = "0.1.1" futures = { workspace = true } shuttle-service = { workspace = true, features = ["builder"] } diff --git a/runtime/src/bin/shuttle-next.rs b/runtime/src/bin/shuttle-next.rs index b99c4f2ed..338f6e84b 100644 --- a/runtime/src/bin/shuttle-next.rs +++ b/runtime/src/bin/shuttle-next.rs @@ -3,7 +3,7 @@ use std::{ time::Duration, }; -use shuttle_common::backends::tracing::{setup_tracing, ExtractPropagationLayer}; +use shuttle_common::backends::tracing::ExtractPropagationLayer; use shuttle_proto::runtime::runtime_server::RuntimeServer; use shuttle_runtime::{print_version, AxumWasm, NextArgs}; use tonic::transport::Server; @@ -20,8 +20,6 @@ async fn main() { let args = NextArgs::parse().unwrap(); - setup_tracing(tracing_subscriber::registry(), "shuttle-next"); - trace!(args = ?args, "parsed args"); let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), args.port); diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index d361599d7..3227616f8 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -277,29 +277,17 @@ mod resource_tracker; pub use alpha::{start, Alpha}; #[cfg(feature = "next")] pub use next::{AxumWasm, NextArgs}; -pub use opentelemetry; -pub use opentelemetry_otlp; pub use provisioner_factory::ProvisionerFactory; pub use resource_tracker::{get_resource, ResourceTracker}; pub use shuttle_common::storage_manager::StorageManager; pub use shuttle_service::{CustomError, Error, Factory, ResourceBuilder, Service}; -pub use tracing_opentelemetry; pub use async_trait::async_trait; -pub type Registry = tracing_subscriber::layer::Layered< - tracing_opentelemetry::OpenTelemetryLayer< - tracing_subscriber::layer::Layered, - opentelemetry::sdk::trace::Tracer, - >, - tracing_subscriber::layer::Layered, ->; - // Dependencies required by the codegen pub use anyhow::Context; pub use strfmt::strfmt; pub use tracing; -pub use tracing_subscriber; // Print the version of the runtime. pub fn print_version() { diff --git a/runtime/tests/integration/helpers.rs b/runtime/tests/integration/helpers.rs index 61a06d0f1..fe2e796f5 100644 --- a/runtime/tests/integration/helpers.rs +++ b/runtime/tests/integration/helpers.rs @@ -6,10 +6,6 @@ use std::{ use anyhow::Result; use async_trait::async_trait; -use opentelemetry_proto::tonic::collector::trace::v1::{ - trace_service_server::{TraceService, TraceServiceServer}, - ExportTraceServiceRequest, ExportTraceServiceResponse, -}; use shuttle_common::claims::{ClaimService, InjectPropagation}; use shuttle_proto::{ provisioner::{ @@ -60,7 +56,6 @@ pub async fn spawn_runtime(project_path: String, service_name: &str) -> Result, - ) -> std::result::Result, tonic::Status> { - println!("request: {request:?}"); - - Ok(Response::new(Default::default())) - } -} diff --git a/runtime/tests/integration/loader.rs b/runtime/tests/integration/loader.rs index 4b4f24cf8..e1e3d6bba 100644 --- a/runtime/tests/integration/loader.rs +++ b/runtime/tests/integration/loader.rs @@ -98,6 +98,7 @@ async fn bind_panic_owned() { assert_ne!(reason.message, ""); assert_eq!(reason.message, "panic in bind"); } + #[tokio::test] async fn loader_panic() { let project_path = format!(