diff --git a/Cargo.lock b/Cargo.lock index b512556e63..defcd4f4d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1021,7 +1021,6 @@ dependencies = [ "serde_json", "tokio", "tower", - "tracing-subscriber", ] [[package]] @@ -5397,9 +5396,9 @@ dependencies = [ [[package]] name = "test-span" -version = "0.6.0" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93924e1fa984429f7b8a1b4fc9652a8ceac21fcd4aefcf7f881e247f4118b180" +checksum = "88c12e1076fc168ae4b9870a29cabf575d318f5399f025c2248a8deae057ed12" dependencies = [ "daggy", "derivative", @@ -5418,9 +5417,9 @@ dependencies = [ [[package]] name = "test-span-macro" -version = "0.6.0" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2529044a35c9d1000884dc46ea88e50410f33871149b0f649a648a91728dc841" +checksum = "1f972445f2c781bb6d47ee4a715db3a0e404a79d977f751fd4eb2b0d44c6eb9d" dependencies = [ "proc-macro2", "quote", diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index 5312651b34..2a8862d60c 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -43,6 +43,20 @@ telemetry: By [@SimonSapin](https://github.com/SimonSapin) +### Remove telemetry configuration hot reloading ([PR #1463](https://github.com/apollographql/router/pull/1463)) + +Configuration hot reloading is not very useful for telemetry, and is the +source of regular bugs that are hard to fix. + +This removes the support for configuration reloading entirely. Now, the +router will reject a configuration reload with an error log if the +telemetry configuration changed. + +It is now possible to create a subscriber and pass it explicitely to the telemetry plugin +when creating it. It will then be modified to integrate the telemetry plugin's layer. + +By [@geal](https://github.com/geal) in https://github.com/apollographql/router/pull/1463 + ## 🚀 Features ### Add support of global rate limit and timeout. [PR #1347](https://github.com/apollographql/router/pull/1347) diff --git a/apollo-router/Cargo.toml b/apollo-router/Cargo.toml index 9495c6dedb..a54a308a9f 100644 --- a/apollo-router/Cargo.toml +++ b/apollo-router/Cargo.toml @@ -166,7 +166,7 @@ tempfile = "3.3.0" test-log = { version = "0.2.10", default-features = false, features = [ "trace", ] } -test-span = "0.6" +test-span = "0.7" tower-test = "0.4.0" tracing-subscriber = { version = "0.3", default-features = false, features = [ "env-filter", diff --git a/apollo-router/src/configuration/mod.rs b/apollo-router/src/configuration/mod.rs index 36386f1537..0006b8a848 100644 --- a/apollo-router/src/configuration/mod.rs +++ b/apollo-router/src/configuration/mod.rs @@ -93,6 +93,7 @@ pub struct Configuration { } const APOLLO_PLUGIN_PREFIX: &str = "apollo."; +const TELEMETRY_KEY: &str = "telemetry"; fn default_listen() -> ListenAddr { SocketAddr::from_str("127.0.0.1:4000").unwrap().into() @@ -143,6 +144,17 @@ impl Configuration { plugins } + + // checks that we can reload configuration from the current one to the new one + pub fn is_compatible(&self, new: &Configuration) -> Result<(), &'static str> { + if self.apollo_plugins.plugins.get(TELEMETRY_KEY) + == new.apollo_plugins.plugins.get(TELEMETRY_KEY) + { + Ok(()) + } else { + Err("incompatible telemetry configuration. Telemetry cannot be reloaded and its configuration must stay the same for the entire life of the process") + } + } } impl FromStr for Configuration { diff --git a/apollo-router/src/error.rs b/apollo-router/src/error.rs index ed6831b30e..b81ff8ee38 100644 --- a/apollo-router/src/error.rs +++ b/apollo-router/src/error.rs @@ -22,7 +22,6 @@ pub(crate) use crate::graphql::Error; use crate::graphql::Response; use crate::json_ext::Path; use crate::json_ext::Value; -pub use crate::reload::Error as ReloadError; pub use crate::spec::SpecError; /// Error types for execution. diff --git a/apollo-router/src/executable.rs b/apollo-router/src/executable.rs index f573e477b4..587b184b6d 100644 --- a/apollo-router/src/executable.rs +++ b/apollo-router/src/executable.rs @@ -15,6 +15,9 @@ use clap::CommandFactory; use clap::Parser; use directories::ProjectDirs; use once_cell::sync::OnceCell; +use tracing::dispatcher::with_default; +use tracing::dispatcher::Dispatch; +use tracing::instrument::WithSubscriber; use tracing_subscriber::EnvFilter; use url::ParseError; use url::Url; @@ -26,10 +29,8 @@ use crate::router::ApolloRouter; use crate::router::ConfigurationKind; use crate::router::SchemaKind; use crate::router::ShutdownKind; -use crate::subscriber::set_global_subscriber; -use crate::subscriber::RouterSubscriber; -static GLOBAL_ENV_FILTER: OnceCell = OnceCell::new(); +pub(crate) static GLOBAL_ENV_FILTER: OnceCell = OnceCell::new(); /// Options for the router #[derive(Parser, Debug)] @@ -192,24 +193,33 @@ impl Executable { return Ok(()); } - // This is more complex than I'd like it to be. Really, we just want to pass - // a FmtSubscriber to set_global_subscriber(), but we can't because of the - // generic nature of FmtSubscriber. See: https://github.com/tokio-rs/tracing/issues/380 - // for more details. let builder = tracing_subscriber::fmt::fmt().with_env_filter( EnvFilter::try_new(&opt.log_level).context("could not parse log configuration")?, ); - let subscriber: RouterSubscriber = if atty::is(atty::Stream::Stdout) { - RouterSubscriber::TextSubscriber(builder.finish()) + let dispatcher = if atty::is(atty::Stream::Stdout) { + Dispatch::new(builder.finish()) } else { - RouterSubscriber::JsonSubscriber(builder.json().finish()) + Dispatch::new(builder.json().finish()) }; - set_global_subscriber(subscriber)?; + GLOBAL_ENV_FILTER.set(opt.log_level.clone()).expect( + "failed setting the global env filter. THe start() function should only be called once", + ); - GLOBAL_ENV_FILTER.set(opt.log_level).unwrap(); + // The dispatcher we created is passed explicitely here to make sure we display the logs + // in the initialization pahse and in the state machine code, before a global subscriber + // is set using the configuration file + Self::inner_start(router_builder_fn, opt, dispatcher.clone()) + .with_subscriber(dispatcher) + .await + } + async fn inner_start( + router_builder_fn: Option ApolloRouter>, + opt: Opt, + dispatcher: Dispatch, + ) -> Result<()> { let current_directory = std::env::current_dir()?; let configuration = opt @@ -229,11 +239,12 @@ impl Executable { } }) .unwrap_or_else(|| Configuration::builder().build().into()); + let apollo_router_msg = format!("Apollo Router v{} // (c) Apollo Graph, Inc. // Licensed as ELv2 (https://go.apollo.dev/elv2)", std::env!("CARGO_PKG_VERSION")); let schema = match (opt.supergraph_path, opt.apollo_key) { (Some(supergraph_path), _) => { tracing::info!("{apollo_router_msg}"); - setup_panic_handler(); + setup_panic_handler(dispatcher.clone()); let supergraph_path = if supergraph_path.is_relative() { current_directory.join(supergraph_path) @@ -248,6 +259,7 @@ impl Executable { } (None, Some(apollo_key)) => { tracing::info!("{apollo_router_msg}"); + let apollo_graph_ref = opt.apollo_graph_ref.ok_or_else(||anyhow!("cannot fetch the supergraph from Apollo Studio without setting the APOLLO_GRAPH_REF environment variable"))?; if opt.apollo_uplink_poll_interval < Duration::from_secs(10) { return Err(anyhow!("Apollo poll interval must be at least 10s")); @@ -322,7 +334,7 @@ impl Executable { } } -fn setup_panic_handler() { +fn setup_panic_handler(dispatcher: Dispatch) { // Redirect panics to the logs. let backtrace_env = std::env::var("RUST_BACKTRACE"); let show_backtraces = @@ -331,12 +343,14 @@ fn setup_panic_handler() { tracing::warn!("RUST_BACKTRACE={} detected. This use useful for diagnostics but will have a performance impact and may leak sensitive information", backtrace_env.as_ref().unwrap()); } std::panic::set_hook(Box::new(move |e| { - if show_backtraces { - let backtrace = backtrace::Backtrace::new(); - tracing::error!("{}\n{:?}", e, backtrace) - } else { - tracing::error!("{}", e) - } + with_default(&dispatcher, || { + if show_backtraces { + let backtrace = backtrace::Backtrace::new(); + tracing::error!("{}\n{:?}", e, backtrace) + } else { + tracing::error!("{}", e) + } + }); })); } diff --git a/apollo-router/src/lib.rs b/apollo-router/src/lib.rs index 88348f967b..4dc4db04f2 100644 --- a/apollo-router/src/lib.rs +++ b/apollo-router/src/lib.rs @@ -62,7 +62,6 @@ pub mod layers; pub mod plugin; pub mod plugins; pub mod query_planner; -mod reload; mod request; mod response; mod router; @@ -70,7 +69,6 @@ mod router_factory; pub mod services; mod spec; mod state_machine; -pub mod subscriber; mod traits; pub use configuration::*; diff --git a/apollo-router/src/plugins/telemetry/mod.rs b/apollo-router/src/plugins/telemetry/mod.rs index bdf0978b9a..a238035e5c 100644 --- a/apollo-router/src/plugins/telemetry/mod.rs +++ b/apollo-router/src/plugins/telemetry/mod.rs @@ -3,12 +3,16 @@ use std::collections::HashMap; use std::error::Error; use std::fmt; +use std::sync::atomic::AtomicU8; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; use std::time::Instant; use ::tracing::info_span; +use ::tracing::subscriber::set_global_default; use ::tracing::Span; +use ::tracing::Subscriber; use apollo_spaceport::server::ReportSpaceport; use apollo_spaceport::StatsContext; use bytes::Bytes; @@ -18,6 +22,7 @@ use futures::StreamExt; use http::HeaderValue; use http::StatusCode; use metrics::apollo::Sender; +use once_cell::sync::OnceCell; use opentelemetry::global; use opentelemetry::propagation::TextMapPropagator; use opentelemetry::sdk::propagation::BaggagePropagator; @@ -25,7 +30,6 @@ use opentelemetry::sdk::propagation::TextMapCompositePropagator; use opentelemetry::sdk::propagation::TraceContextPropagator; use opentelemetry::sdk::trace::Builder; use opentelemetry::trace::SpanKind; -use opentelemetry::trace::Tracer; use opentelemetry::trace::TracerProvider; use opentelemetry::KeyValue; use router_bridge::planner::UsageReporting; @@ -35,11 +39,16 @@ use tower::util::BoxService; use tower::BoxError; use tower::ServiceBuilder; use tower::ServiceExt; +use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt; +use tracing_subscriber::registry::LookupSpan; +use tracing_subscriber::EnvFilter; +use tracing_subscriber::Registry; use url::Url; use self::config::Conf; use self::metrics::AttributesForwardConf; use self::metrics::MetricsAttributesConf; +use crate::executable::GLOBAL_ENV_FILTER; use crate::http_ext; use crate::layers::ServiceBuilderExt; use crate::plugin::Handler; @@ -59,7 +68,6 @@ use crate::plugins::telemetry::metrics::MetricsExporterHandle; use crate::plugins::telemetry::tracing::TracingConfigurator; use crate::query_planner::USAGE_REPORTING; use crate::register_plugin; -use crate::subscriber::replace_layer; use crate::Context; use crate::ExecutionRequest; use crate::ExecutionResponse; @@ -84,9 +92,11 @@ const SUBGRAPH_ATTRIBUTES: &str = "apollo_telemetry::subgraph_metrics_attributes pub(crate) static STUDIO_EXCLUDE: &str = "apollo_telemetry::studio::exclude"; const DEFAULT_SERVICE_NAME: &str = "apollo-router"; +static TELEMETRY_LOADED: OnceCell = OnceCell::new(); +static TELEMETRY_REFCOUNT: AtomicU8 = AtomicU8::new(0); + pub struct Telemetry { config: config::Conf, - tracer_provider: Option, // Do not remove _metrics_exporters. Metrics will not be exported if it is removed. // Typically the handles are a PushController but may be something else. Dropping the handle will // shutdown exporter. @@ -132,23 +142,17 @@ fn setup_metrics_exporter( impl Drop for Telemetry { fn drop(&mut self) { - if let Some(tracer_provider) = self.tracer_provider.take() { - // Tracer providers must be flushed. This may happen as part of otel if the provider was set - // as the global, but may also happen in the case of an failed config reload. - // If the tracer prover is present then it was not handed over so we must flush it. - // We must make the call to force_flush() from a separate thread to prevent hangs: - // see https://github.com/open-telemetry/opentelemetry-rust/issues/536. - ::tracing::debug!("flushing telemetry"); - let jh = tokio::task::spawn_blocking(move || { - opentelemetry::trace::TracerProvider::force_flush(&tracer_provider); - }); - futures::executor::block_on(jh).expect("failed to flush tracer provider"); - } - if let Some(sender) = self.spaceport_shutdown.take() { ::tracing::debug!("notifying spaceport to shut down"); let _ = sender.send(()); } + + let count = TELEMETRY_REFCOUNT.fetch_sub(1, Ordering::Relaxed); + if count < 2 { + std::thread::spawn(|| { + opentelemetry::global::shutdown_tracer_provider(); + }); + } } } @@ -156,104 +160,8 @@ impl Drop for Telemetry { impl Plugin for Telemetry { type Config = config::Conf; - fn activate(&mut self) { - // The active service is about to be swapped in. - // The rest of this code in this method is expected to succeed. - // The issue is that Otel uses globals for a bunch of stuff. - // If we move to a completely tower based architecture then we could make this nicer. - let tracer_provider = self - .tracer_provider - .take() - .expect("trace_provider will have been set in startup, qed"); - let tracer = tracer_provider.versioned_tracer( - "apollo-router", - Some(env!("CARGO_PKG_VERSION")), - None, - ); - let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); - Self::replace_tracer_provider(tracer_provider); - - replace_layer(Box::new(telemetry)) - .expect("set_global_subscriber() was not called at startup, fatal"); - opentelemetry::global::set_error_handler(handle_error) - .expect("otel error handler lock poisoned, fatal"); - global::set_text_map_propagator(Self::create_propagator(&self.config)); - } - async fn new(init: PluginInit) -> Result { - // Apollo config is special because we enable tracing if some env variables are present. - let mut config = init.config; - let apollo = config - .apollo - .as_mut() - .expect("telemetry apollo config must be present"); - - // If we have key and graph ref but no endpoint we start embedded spaceport - let (spaceport, shutdown_tx) = match apollo { - apollo::Config { - apollo_key: Some(_), - apollo_graph_ref: Some(_), - endpoint: None, - .. - } => { - ::tracing::debug!("starting Spaceport"); - let (shutdown_tx, shutdown_rx) = futures::channel::oneshot::channel(); - let report_spaceport = ReportSpaceport::new( - "127.0.0.1:0".parse()?, - Some(Box::pin(shutdown_rx.map(|_| ()))), - ) - .await?; - // Now that the port is known update the config - apollo.endpoint = Some(Url::parse(&format!( - "https://{}", - report_spaceport.address() - ))?); - (Some(report_spaceport), Some(shutdown_tx)) - } - _ => (None, None), - }; - - // Setup metrics - // The act of setting up metrics will overwrite a global meter. However it is essential that - // we use the aggregate meter provider that is created below. It enables us to support - // sending metrics to multiple providers at once, of which hopefully Apollo Studio will - // eventually be one. - let mut builder = Self::create_metrics_exporters(&config)?; - - //// THIS IS IMPORTANT - // Once the trace provider has been created this method MUST NOT FAIL - // The trace provider will not be shut down if drop is not called and it will result in a hang. - // Don't add anything fallible after the tracer provider has been created. - let tracer_provider = Self::create_tracer_provider(&config)?; - - let plugin = Ok(Telemetry { - spaceport_shutdown: shutdown_tx, - tracer_provider: Some(tracer_provider), - custom_endpoints: builder.custom_endpoints(), - _metrics_exporters: builder.exporters(), - meter_provider: builder.meter_provider(), - apollo_metrics_sender: builder.apollo_metrics_provider(), - config, - }); - - // We're safe now for shutdown. - // Start spaceport - if let Some(spaceport) = spaceport { - tokio::spawn(async move { - if let Err(e) = spaceport.serve().await { - match e.source() { - Some(source) => { - ::tracing::warn!("spaceport did not terminate normally: {}", source); - } - None => { - ::tracing::warn!("spaceport did not terminate normally: {}", e); - } - } - }; - }); - } - - plugin + Self::new_with_subscriber::(init, None).await } fn router_service( @@ -623,6 +531,133 @@ impl Plugin for Telemetry { } impl Telemetry { + /// This method can be used instead of `Plugin::new` to override the subscriber + pub async fn new_with_subscriber( + init: PluginInit<::Config>, + subscriber: Option, + ) -> Result + where + S: Subscriber + Send + Sync + for<'span> LookupSpan<'span>, + { + // Apollo config is special because we enable tracing if some env variables are present. + let mut config = init.config; + let apollo = config + .apollo + .as_mut() + .expect("telemetry apollo config must be present"); + + // If we have key and graph ref but no endpoint we start embedded spaceport + let (spaceport, shutdown_tx) = match apollo { + apollo::Config { + apollo_key: Some(_), + apollo_graph_ref: Some(_), + endpoint: None, + .. + } => { + ::tracing::debug!("starting Spaceport"); + let (shutdown_tx, shutdown_rx) = futures::channel::oneshot::channel(); + let report_spaceport = ReportSpaceport::new( + "127.0.0.1:0".parse()?, + Some(Box::pin(shutdown_rx.map(|_| ()))), + ) + .await?; + // Now that the port is known update the config + apollo.endpoint = Some(Url::parse(&format!( + "https://{}", + report_spaceport.address() + ))?); + (Some(report_spaceport), Some(shutdown_tx)) + } + _ => (None, None), + }; + + // Setup metrics + // The act of setting up metrics will overwrite a global meter. However it is essential that + // we use the aggregate meter provider that is created below. It enables us to support + // sending metrics to multiple providers at once, of which hopefully Apollo Studio will + // eventually be one. + let mut builder = Self::create_metrics_exporters(&config)?; + + // the global tracer and subscriber initialization step must be performed only once + TELEMETRY_LOADED.get_or_try_init::<_, BoxError>(|| { + use anyhow::Context; + let tracer_provider = Self::create_tracer_provider(&config)?; + + let tracer = tracer_provider.versioned_tracer( + "apollo-router", + Some(env!("CARGO_PKG_VERSION")), + None, + ); + + opentelemetry::global::set_tracer_provider(tracer_provider); + opentelemetry::global::set_error_handler(handle_error) + .expect("otel error handler lock poisoned, fatal"); + global::set_text_map_propagator(Self::create_propagator(&config)); + + let log_level = GLOBAL_ENV_FILTER + .get() + .map(|s| s.as_str()) + .unwrap_or("info"); + + let sub_builder = tracing_subscriber::fmt::fmt().with_env_filter( + EnvFilter::try_new(log_level).context("could not parse log configuration")?, + ); + + if let Some(sub) = subscriber { + let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + let subscriber = sub.with(telemetry); + if let Err(e) = set_global_default(subscriber) { + ::tracing::error!("cannot set global subscriber: {:?}", e); + } + } else if atty::is(atty::Stream::Stdout) { + let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + + let subscriber = sub_builder.finish().with(telemetry); + if let Err(e) = set_global_default(subscriber) { + ::tracing::error!("cannot set global subscriber: {:?}", e); + } + } else { + let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + + let subscriber = sub_builder.json().finish().with(telemetry); + if let Err(e) = set_global_default(subscriber) { + ::tracing::error!("cannot set global subscriber: {:?}", e); + } + }; + + Ok(true) + })?; + + let plugin = Ok(Telemetry { + spaceport_shutdown: shutdown_tx, + custom_endpoints: builder.custom_endpoints(), + _metrics_exporters: builder.exporters(), + meter_provider: builder.meter_provider(), + apollo_metrics_sender: builder.apollo_metrics_provider(), + config, + }); + + // We're safe now for shutdown. + // Start spaceport + if let Some(spaceport) = spaceport { + tokio::spawn(async move { + if let Err(e) = spaceport.serve().await { + match e.source() { + Some(source) => { + ::tracing::warn!("spaceport did not terminate normally: {}", source); + } + None => { + ::tracing::warn!("spaceport did not terminate normally: {}", e); + } + } + }; + }); + } + + let _ = TELEMETRY_REFCOUNT.fetch_add(1, Ordering::Relaxed); + plugin + } + fn create_propagator(config: &config::Conf) -> TextMapCompositePropagator { let propagation = config .clone() @@ -708,20 +743,6 @@ impl Telemetry { ) } - fn replace_tracer_provider(tracer_provider: T) - where - T: TracerProvider + Send + Sync + 'static, - ::Tracer: Send + Sync + 'static, - <::Tracer as Tracer>::Span: - Send + Sync + 'static, - { - let jh = tokio::task::spawn_blocking(|| { - opentelemetry::global::force_flush_tracer_provider(); - opentelemetry::global::set_tracer_provider(tracer_provider); - }); - futures::executor::block_on(jh).expect("failed to replace tracer provider"); - } - fn router_service_span(config: apollo::Config) -> impl Fn(&RouterRequest) -> Span + Clone { let client_name_header = config.client_name_header; let client_version_header = config.client_version_header; diff --git a/apollo-router/src/reload.rs b/apollo-router/src/reload.rs deleted file mode 100644 index f7bc8e0633..0000000000 --- a/apollo-router/src/reload.rs +++ /dev/null @@ -1,275 +0,0 @@ -//! THIS IS PORTED FROM THE tracing_subscriber CRATE. SLIGHTLY MODIFIED -//! TO ADD SUPPORT FOR `downcast_raw`. THE FIX IS TEMPORARY -//! WHILST WE FIGURE OUT HOW TO PROGRESS THIS WITH THE CRATE OWNERS. -//! -//! SEE THE COMMENT ON THE `downcast_raw` FN FOR MORE DETAILS. -//! -//! Wrapper for a `Layer` to allow it to be dynamically reloaded. -//! -//! This module provides a [`Layer` type] which wraps another type implementing -//! the [`Layer` trait], allowing the wrapped type to be replaced with another -//! instance of that type at runtime. -//! -//! This can be used in cases where a subset of `Subscriber` functionality -//! should be dynamically reconfigured, such as when filtering directives may -//! change at runtime. Note that this layer introduces a (relatively small) -//! amount of overhead, and should thus only be used as needed. -//! -//! [`Layer` type]: struct.Layer.html -//! [`Layer` trait]: ../layer/trait.Layer.html -use std::any::TypeId; -use std::error; -use std::fmt; -use std::marker::PhantomData; -use std::sync::Arc; -use std::sync::RwLock; -use std::sync::Weak; - -use tracing_core::callsite; -use tracing_core::span; -use tracing_core::subscriber::Interest; -use tracing_core::subscriber::Subscriber; -use tracing_core::Event; -use tracing_core::Metadata; -use tracing_subscriber::layer; - -macro_rules! try_lock { - ($lock:expr) => { - try_lock!($lock, else return) - }; - ($lock:expr, else $els:expr) => { - if let Ok(l) = $lock { - l - } else if std::thread::panicking() { - $els - } else { - panic!("lock poisoned") - } - }; -} - -/// Wraps a `Layer`, allowing it to be reloaded dynamically at runtime. -#[derive(Debug)] -pub(crate) struct Layer { - // TODO(eliza): this once used a `crossbeam_util::ShardedRwLock`. We may - // eventually wish to replace it with a sharded lock implementation on top - // of our internal `RwLock` wrapper type. If possible, we should profile - // this first to determine if it's necessary. - inner: Arc>, - _s: PhantomData, -} - -/// Allows reloading the state of an associated `Layer`. -#[derive(Debug)] -pub(crate) struct Handle { - inner: Weak>, - _s: PhantomData, -} - -/// Indicates that an error occurred when reloading a layer. -#[derive(Debug)] -pub struct Error { - kind: ErrorKind, -} - -#[derive(Debug)] -enum ErrorKind { - SubscriberGone, - Poisoned, -} - -// ===== impl Layer ===== - -impl tracing_subscriber::Layer for Layer -where - L: tracing_subscriber::Layer + 'static, - S: Subscriber, -{ - fn on_layer(&mut self, subscriber: &mut S) { - try_lock!(self.inner.write(), else return).on_layer(subscriber); - } - - #[inline] - fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest { - try_lock!(self.inner.read(), else return Interest::sometimes()).register_callsite(metadata) - } - - #[inline] - fn enabled(&self, metadata: &Metadata<'_>, ctx: layer::Context<'_, S>) -> bool { - try_lock!(self.inner.read(), else return false).enabled(metadata, ctx) - } - - #[inline] - fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: layer::Context<'_, S>) { - try_lock!(self.inner.read()).on_new_span(attrs, id, ctx) - } - - #[inline] - fn on_record(&self, span: &span::Id, values: &span::Record<'_>, ctx: layer::Context<'_, S>) { - try_lock!(self.inner.read()).on_record(span, values, ctx) - } - - #[inline] - fn on_follows_from(&self, span: &span::Id, follows: &span::Id, ctx: layer::Context<'_, S>) { - try_lock!(self.inner.read()).on_follows_from(span, follows, ctx) - } - - #[inline] - fn on_event(&self, event: &Event<'_>, ctx: layer::Context<'_, S>) { - try_lock!(self.inner.read()).on_event(event, ctx) - } - - #[inline] - fn on_enter(&self, id: &span::Id, ctx: layer::Context<'_, S>) { - try_lock!(self.inner.read()).on_enter(id, ctx) - } - - #[inline] - fn on_exit(&self, id: &span::Id, ctx: layer::Context<'_, S>) { - try_lock!(self.inner.read()).on_exit(id, ctx) - } - - #[inline] - fn on_close(&self, id: span::Id, ctx: layer::Context<'_, S>) { - try_lock!(self.inner.read()).on_close(id, ctx) - } - - #[inline] - fn on_id_change(&self, old: &span::Id, new: &span::Id, ctx: layer::Context<'_, S>) { - try_lock!(self.inner.read()).on_id_change(old, new, ctx) - } - - // In the context in which we use this code, it is deemed to - // be safe to return a pointer for the downcast since we "know" - // that the function which we are ultimately pointing at is - // statically defined in the code and will not change over the - // lifetime of the process. - // - // This is not true in the "general case", which I imagine is - // why this is not implemented in the tracing_subscriber crate. - #[inline] - unsafe fn downcast_raw(&self, id: TypeId) -> Option<*const ()> { - self.inner.read().unwrap().downcast_raw(id) - } -} - -impl Layer -where - L: tracing_subscriber::Layer + 'static, - S: Subscriber, -{ - /// Wraps the given `Layer`, returning a `Layer` and a `Handle` that allows - /// the inner type to be modified at runtime. - pub(crate) fn new(inner: L) -> (Self, Handle) { - let this = Self { - inner: Arc::new(RwLock::new(inner)), - _s: PhantomData, - }; - let handle = this.handle(); - (this, handle) - } - - /// Returns a `Handle` that can be used to reload the wrapped `Layer`. - pub(crate) fn handle(&self) -> Handle { - Handle { - inner: Arc::downgrade(&self.inner), - _s: PhantomData, - } - } -} - -// ===== impl Handle ===== - -impl Handle -where - L: tracing_subscriber::Layer + 'static, - S: Subscriber, -{ - /// Replace the current layer with the provided `new_layer`. - pub(crate) fn reload(&self, new_layer: impl Into) -> Result<(), Error> { - self.modify(|layer| { - *layer = new_layer.into(); - }) - } - - /// Invokes a closure with a mutable reference to the current layer, - /// allowing it to be modified in place. - pub(crate) fn modify(&self, f: impl FnOnce(&mut L)) -> Result<(), Error> { - let inner = self.inner.upgrade().ok_or(Error { - kind: ErrorKind::SubscriberGone, - })?; - - let mut lock = try_lock!(inner.write(), else return Err(Error::poisoned())); - f(&mut *lock); - // Release the lock before rebuilding the interest cache, as that - // function will lock the new layer. - drop(lock); - - callsite::rebuild_interest_cache(); - Ok(()) - } - - /// Returns a clone of the layer's current value if it still exists. - /// Otherwise, if the subscriber has been dropped, returns `None`. - #[allow(dead_code)] - pub(crate) fn clone_current(&self) -> Option - where - L: Clone, - { - self.with_current(L::clone).ok() - } - - /// Invokes a closure with a borrowed reference to the current layer, - /// returning the result (or an error if the subscriber no longer exists). - #[allow(dead_code)] - pub(crate) fn with_current(&self, f: impl FnOnce(&L) -> T) -> Result { - let inner = self.inner.upgrade().ok_or(Error { - kind: ErrorKind::SubscriberGone, - })?; - let inner = try_lock!(inner.read(), else return Err(Error::poisoned())); - Ok(f(&*inner)) - } -} - -impl Clone for Handle { - fn clone(&self) -> Self { - Handle { - inner: self.inner.clone(), - _s: PhantomData, - } - } -} - -// ===== impl Error ===== - -impl Error { - fn poisoned() -> Self { - Self { - kind: ErrorKind::Poisoned, - } - } - - /// Returns `true` if this error occurred because the layer was poisoned by - /// a panic on another thread. - pub fn is_poisoned(&self) -> bool { - matches!(self.kind, ErrorKind::Poisoned) - } - - /// Returns `true` if this error occurred because the `Subscriber` - /// containing the reloadable layer was dropped. - pub fn is_dropped(&self) -> bool { - matches!(self.kind, ErrorKind::SubscriberGone) - } -} - -impl fmt::Display for Error { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let msg = match self.kind { - ErrorKind::SubscriberGone => "subscriber no longer exists", - ErrorKind::Poisoned => "lock poisoned", - }; - f.pad(msg) - } -} - -impl error::Error for Error {} diff --git a/apollo-router/src/router.rs b/apollo-router/src/router.rs index c3c817da7f..7808e4929f 100644 --- a/apollo-router/src/router.rs +++ b/apollo-router/src/router.rs @@ -18,6 +18,7 @@ use thiserror::Error; use tokio::sync::RwLock; use tokio::task::spawn; use tracing::subscriber::SetGlobalDefaultError; +use tracing_futures::WithSubscriber; use url::Url; use Event::NoMoreConfiguration; use Event::NoMoreSchema; @@ -29,7 +30,6 @@ use crate::axum_http_server_factory::AxumHttpServerFactory; use crate::configuration::validate_configuration; use crate::configuration::Configuration; use crate::configuration::ListenAddr; -use crate::reload::Error as ReloadError; use crate::router_factory::YamlRouterServiceFactory; use crate::state_machine::StateMachine; @@ -76,9 +76,6 @@ pub enum ApolloRouterError { /// could not set global subscriber: {0} SetGlobalSubscriberError(SetGlobalDefaultError), - - /// could not reload tracing layer: {0} - ReloadTracingLayerError(ReloadError), } /// The user supplied schema. Either a static string or a stream for hot reloading. @@ -375,8 +372,6 @@ pub struct ApolloRouter { #[buildstructor::buildstructor] impl ApolloRouter { /// Build a new Apollo router. - /// - /// This must only be called in the context of Executable::builder() because it relies on custom logging setup to support hot reload. #[builder] pub fn new( configuration: ConfigurationKind, @@ -466,16 +461,20 @@ impl ApolloRouter { let state_machine = StateMachine::new(server_factory, self.router_factory); let listen_address = state_machine.listen_address.clone(); - let result = spawn(async move { state_machine.process_events(event_stream).await }) - .map(|r| match r { - Ok(Ok(ok)) => Ok(ok), - Ok(Err(err)) => Err(err), - Err(err) => { - tracing::error!("{}", err); - Err(ApolloRouterError::StartupError) - } - }) - .boxed(); + let result = spawn( + async move { state_machine.process_events(event_stream).await } + .with_current_subscriber(), + ) + .map(|r| match r { + Ok(Ok(ok)) => Ok(ok), + Ok(Err(err)) => Err(err), + Err(err) => { + tracing::error!("{}", err); + Err(ApolloRouterError::StartupError) + } + }) + .with_current_subscriber() + .boxed(); RouterHandle { result, diff --git a/apollo-router/src/router_factory.rs b/apollo-router/src/router_factory.rs index bb35dceaf1..92066e2e01 100644 --- a/apollo-router/src/router_factory.rs +++ b/apollo-router/src/router_factory.rs @@ -127,12 +127,11 @@ async fn create_plugins( schema: &Schema, ) -> Result)>, BoxError> { // List of mandatory plugins. Ordering is important!! - let mut mandatory_plugins = vec!["experimental.include_subgraph_errors", "apollo.csrf"]; - - // Telemetry is *only* mandatory if the global subscriber is set - if crate::subscriber::is_global_subscriber_set() { - mandatory_plugins.insert(0, "apollo.telemetry"); - } + let mandatory_plugins = vec![ + "experimental.include_subgraph_errors", + "apollo.csrf", + "apollo.telemetry", + ]; let mut errors = Vec::new(); let plugin_registry = crate::plugin::plugins(); diff --git a/apollo-router/src/state_machine.rs b/apollo-router/src/state_machine.rs index ab0adf736b..e209ac07b5 100644 --- a/apollo-router/src/state_machine.rs +++ b/apollo-router/src/state_machine.rs @@ -187,26 +187,37 @@ where Running { configuration, schema, - router_service_factory: router_service, + router_service_factory, server_handle, }, UpdateConfiguration(new_configuration), ) => { tracing::info!("reloading configuration"); - self.reload_server( - configuration, - schema, - router_service, - server_handle, - Some(Arc::new(*new_configuration)), - None, - ) - .await - .map(|s| { - tracing::info!("reloaded"); - s - }) - .into_ok_or_err2() + if let Err(e) = configuration.is_compatible(&new_configuration) { + tracing::error!("could not reload configuration: {e}"); + + Running { + configuration, + schema, + router_service_factory, + server_handle, + } + } else { + self.reload_server( + configuration, + schema, + router_service_factory, + server_handle, + Some(Arc::new(*new_configuration)), + None, + ) + .await + .map(|s| { + tracing::info!("reloaded"); + s + }) + .into_ok_or_err2() + } } // Anything else we don't care about diff --git a/apollo-router/src/subscriber.rs b/apollo-router/src/subscriber.rs deleted file mode 100644 index 6f87372eb3..0000000000 --- a/apollo-router/src/subscriber.rs +++ /dev/null @@ -1,282 +0,0 @@ -//! The apollo-router Tracing Subscriber. -//! -//! Here are some constraints: -//! - We'd like to use tower to compose our services -//! - We'd like to be able to choose between Json or Text logging -//! - We'd like to use EnvFilter to specify logging parameters -//! - We'd like our configuration to be dynamic/re-loadable -//! -//! This set of constraints, in the round, act to substantially limit -//! our choices in terms of our Subscriber/Layer options with tracing. -//! -//! 1. Tower, in particular the use of buffer(), spawns threads in a -//! separate Tokio runtime. One consequence of this is that we have to -//! set a single global subscriber in order for the spans to be simply -//! propagated. The alternative is having multiple subscribers which -//! must be tracked within our code and then propagated into spawned -//! threads using the Buffer::pair() mechanism. -//! -//! 2. FmtSubscriber is heavily generic. The only viable mechanism for -//! reloading Layers within Tracing is to use the Reload module. However, -//! this won't accept a BoxedLayer, but requires a Concrete Layer and -//! imposes other restrictions on the implementation. RouterSubscriber -//! acts as the concrete implementation and delegates Json/Text decisions -//! to particular configurations of FmtSubscriber composed with an -//! EnvFilter. -//! -//! 3. With dynamic logging configuration, we need a way to register that -//! change. Originally we used multiple subscribers, but see (1) for the -//! problems associate with that. Now we are using a single, global -//! subscriber which supports a Reload layer. We can't use the Reload -//! layer from the tracing-subscriber crate because it doesn't properly -//! downcast when required by the tracing-opentelemetry crate. We've -//! copied the implementation of Reload from the tracing-subscriber crate -//! and added the appropriate downcast support. -//! -//! There is another alternative which we haven't examined which is using -//! Option to enable/disable different Layers based on configuration. -//! That might be a simpler solution than using Reload, but it's not clear -//! how we would control layer enabling at runtime. That may be worth -//! exploring at some point. -//! -//! Summary: -//! - We chose not to use multiple subscribers to make it simpler to write -//! plugins and not have to understand why Buffer::pair() is required. -//! - With a single, generic subscriber, we needed a way to represent that -//! in the code base, hence RouterSubscriber -//! - To make reloading work properly, we had to fork the Reload -//! implementation from tracing-subscriber to add the downcasting support -//! which makes things work. -//! -//! Implementation Notes: -//! -//! In our implemenation of download_raw() in our Reload layer, we rely on -//! the fact that the tracing-opentelemetry implementation behaves as -//! follows: -//! - SpanExt::context() uses a downcast to WithContext which then invokes -//! a function which we "know" is statically defined in the code and -//! cannot change at runtime. This makes it "safe" to unsafely return -//! a pointer and execute through that pointer. -//! We will need to validate that this remains true as the various moving -//! parts change (upgrade) over time. -use std::any::TypeId; - -use once_cell::sync::OnceCell; -use tracing::span::Attributes; -use tracing::span::Record; -use tracing::subscriber::set_global_default; -use tracing::Event as TracingEvent; -use tracing::Id; -use tracing::Metadata; -use tracing::Subscriber; -use tracing_core::span::Current; -use tracing_core::Interest; -use tracing_core::LevelFilter; -use tracing_subscriber::fmt::format::DefaultFields; -use tracing_subscriber::fmt::format::Format; -use tracing_subscriber::fmt::format::Json; -use tracing_subscriber::fmt::format::JsonFields; -use tracing_subscriber::registry::Data; -use tracing_subscriber::registry::LookupSpan; -use tracing_subscriber::EnvFilter; -use tracing_subscriber::FmtSubscriber; -use tracing_subscriber::Layer; - -use crate::reload::Handle; -use crate::reload::Layer as ReloadLayer; -use crate::router::ApolloRouterError; - -pub(crate) type BoxedLayer = Box + Send + Sync>; - -type FmtSubscriberTextEnv = FmtSubscriber; -type FmtSubscriberJsonEnv = FmtSubscriber, EnvFilter>; - -/// Choice of JSON or Text output. -pub enum RouterSubscriber { - JsonSubscriber(FmtSubscriberJsonEnv), - TextSubscriber(FmtSubscriberTextEnv), -} - -impl Subscriber for RouterSubscriber { - // Required to make the trait work - - fn clone_span(&self, id: &Id) -> Id { - match self { - RouterSubscriber::JsonSubscriber(sub) => sub.clone_span(id), - RouterSubscriber::TextSubscriber(sub) => sub.clone_span(id), - } - } - - fn try_close(&self, id: Id) -> bool { - match self { - RouterSubscriber::JsonSubscriber(sub) => sub.try_close(id), - RouterSubscriber::TextSubscriber(sub) => sub.try_close(id), - } - } - - /// The delegated downcasting model is copied from the implementation - /// of `Subscriber` for `Layered` in the tracing_subscriber crate. - /// The logic appears to be sound, but be wary of problems here. - unsafe fn downcast_raw(&self, id: std::any::TypeId) -> Option<*const ()> { - // If downcasting to `Self`, return a pointer to `self`. - if id == TypeId::of::() { - return Some(self as *const _ as *const ()); - } - - // If not downcasting to `Self`, then check the encapsulated - // subscribers to see if we can downcast one of them. - match self { - RouterSubscriber::JsonSubscriber(sub) => sub.downcast_raw(id), - RouterSubscriber::TextSubscriber(sub) => sub.downcast_raw(id), - } - } - - // May not be required to work, but better safe than sorry - - fn current_span(&self) -> Current { - match self { - RouterSubscriber::JsonSubscriber(sub) => sub.current_span(), - RouterSubscriber::TextSubscriber(sub) => sub.current_span(), - } - } - - fn drop_span(&self, id: Id) { - // Rather than delegate, call try_close() to avoid deprecation - // complaints - self.try_close(id); - } - - fn max_level_hint(&self) -> Option { - match self { - RouterSubscriber::JsonSubscriber(sub) => sub.max_level_hint(), - RouterSubscriber::TextSubscriber(sub) => sub.max_level_hint(), - } - } - - fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest { - match self { - RouterSubscriber::JsonSubscriber(sub) => sub.register_callsite(metadata), - RouterSubscriber::TextSubscriber(sub) => sub.register_callsite(metadata), - } - } - - // Required by the trait - - fn enabled(&self, meta: &Metadata<'_>) -> bool { - match self { - RouterSubscriber::JsonSubscriber(sub) => sub.enabled(meta), - RouterSubscriber::TextSubscriber(sub) => sub.enabled(meta), - } - } - - fn new_span(&self, attrs: &Attributes<'_>) -> Id { - match self { - RouterSubscriber::JsonSubscriber(sub) => sub.new_span(attrs), - RouterSubscriber::TextSubscriber(sub) => sub.new_span(attrs), - } - } - - fn record(&self, span: &Id, values: &Record<'_>) { - match self { - RouterSubscriber::JsonSubscriber(sub) => sub.record(span, values), - RouterSubscriber::TextSubscriber(sub) => sub.record(span, values), - } - } - - fn record_follows_from(&self, span: &Id, follows: &Id) { - match self { - RouterSubscriber::JsonSubscriber(sub) => sub.record_follows_from(span, follows), - RouterSubscriber::TextSubscriber(sub) => sub.record_follows_from(span, follows), - } - } - - fn event(&self, event: &TracingEvent<'_>) { - match self { - RouterSubscriber::JsonSubscriber(sub) => sub.event(event), - RouterSubscriber::TextSubscriber(sub) => sub.event(event), - } - } - - fn enter(&self, id: &Id) { - match self { - RouterSubscriber::JsonSubscriber(sub) => sub.enter(id), - RouterSubscriber::TextSubscriber(sub) => sub.enter(id), - } - } - - fn exit(&self, id: &Id) { - match self { - RouterSubscriber::JsonSubscriber(sub) => sub.exit(id), - RouterSubscriber::TextSubscriber(sub) => sub.exit(id), - } - } -} - -impl<'a> LookupSpan<'a> for RouterSubscriber { - type Data = Data<'a>; - - fn span_data(&'a self, id: &Id) -> Option<>::Data> { - match self { - RouterSubscriber::JsonSubscriber(sub) => sub.span_data(id), - RouterSubscriber::TextSubscriber(sub) => sub.span_data(id), - } - } -} - -pub(crate) struct BaseLayer; - -// We don't actually need our BaseLayer to do anything. It exists as a holder -// for the layers set by the reporting.rs plugin -impl Layer for BaseLayer where S: Subscriber + for<'span> LookupSpan<'span> {} - -static RELOAD_HANDLE: OnceCell> = OnceCell::new(); - -/// Check if the router reloading global subscriber is set. -pub fn is_global_subscriber_set() -> bool { - matches!(RELOAD_HANDLE.get(), Some(_)) -} - -/// Set the router reloading global subscriber. -/// -/// The provided subscriber is composed with a reloadable layer so that the default -/// global subscriber is now reloadable. -pub fn set_global_subscriber(subscriber: RouterSubscriber) -> Result<(), ApolloRouterError> { - RELOAD_HANDLE - .get_or_try_init(move || { - // First create a boxed BaseLayer - let cl: BoxedLayer = Box::new(BaseLayer {}); - - // Now create a reloading layer from that - let (reloading_layer, handle) = ReloadLayer::new(cl); - - // Box up our reloading layer - let rl: BoxedLayer = Box::new(reloading_layer); - - // Compose that with our subscriber - let composed = rl.with_subscriber(subscriber); - - // Set our subscriber as the global subscriber - set_global_default(composed)?; - - // Return our handle to store in OnceCell - Ok(handle) - }) - .map_err(ApolloRouterError::SetGlobalSubscriberError)?; - Ok(()) -} - -/// Replace the tracing layer. -/// -/// Reload the current tracing layer with new_layer. -pub fn replace_layer(new_layer: BoxedLayer) -> Result<(), ApolloRouterError> { - match RELOAD_HANDLE.get() { - Some(hdl) => { - hdl.reload(new_layer) - .map_err(ApolloRouterError::ReloadTracingLayerError)?; - } - None => { - return Err(ApolloRouterError::NoReloadTracingHandleError); - } - } - Ok(()) -} diff --git a/apollo-router/tests/integration_tests.rs b/apollo-router/tests/integration_tests.rs index 8cc6799f12..383beb8ea6 100644 --- a/apollo-router/tests/integration_tests.rs +++ b/apollo-router/tests/integration_tests.rs @@ -35,6 +35,7 @@ use test_span::prelude::*; use tower::util::BoxCloneService; use tower::BoxError; use tower::ServiceExt; +use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt; macro_rules! assert_federated_response { ($query:expr, $service_requests:expr $(,)?) => { @@ -694,14 +695,17 @@ async fn setup_router_and_registry( let counting_registry = CountingServiceRegistry::new(); let subgraphs = schema.subgraphs(); let mut builder = PluggableRouterServiceBuilder::new(schema.clone()).with_configuration(config); - let telemetry_plugin = Telemetry::new(PluginInit::new( - telemetry::config::Conf { - metrics: Option::default(), - tracing: Some(Tracing::default()), - apollo: Some(apollo::Config::default()), - }, - Default::default(), - )) + let telemetry_plugin = Telemetry::new_with_subscriber( + PluginInit::new( + telemetry::config::Conf { + metrics: Option::default(), + tracing: Some(Tracing::default()), + apollo: Some(apollo::Config::default()), + }, + Default::default(), + ), + Some(tracing_subscriber::registry().with(test_span::Layer {})), + ) .await .unwrap(); let csrf_plugin = csrf::Csrf::new(PluginInit::new(Default::default(), Default::default())) diff --git a/apollo-router/tests/snapshots/integration_tests__traced_basic_composition.snap b/apollo-router/tests/snapshots/integration_tests__traced_basic_composition.snap index 0cba8ec42c..a711527d8a 100644 --- a/apollo-router/tests/snapshots/integration_tests__traced_basic_composition.snap +++ b/apollo-router/tests/snapshots/integration_tests__traced_basic_composition.snap @@ -1,6 +1,6 @@ --- source: apollo-router/tests/integration_tests.rs -assertion_line: 160 +assertion_line: 161 expression: get_spans() --- { @@ -132,10 +132,6 @@ expression: get_spans() [ "otel.kind", "internal" - ], - [ - "message", - "flushing telemetry" ] ], "metadata": { diff --git a/apollo-router/tests/snapshots/integration_tests__traced_basic_request.snap b/apollo-router/tests/snapshots/integration_tests__traced_basic_request.snap index aee2880933..cdaded0ed0 100644 --- a/apollo-router/tests/snapshots/integration_tests__traced_basic_request.snap +++ b/apollo-router/tests/snapshots/integration_tests__traced_basic_request.snap @@ -1,6 +1,6 @@ --- source: apollo-router/tests/integration_tests.rs -assertion_line: 146 +assertion_line: 147 expression: get_spans() --- { @@ -132,10 +132,6 @@ expression: get_spans() [ "otel.kind", "internal" - ], - [ - "message", - "flushing telemetry" ] ], "metadata": { diff --git a/apollo-router/tests/telemetry_test.rs b/apollo-router/tests/telemetry_test.rs index 35eba71b1d..ba67d06b6b 100644 --- a/apollo-router/tests/telemetry_test.rs +++ b/apollo-router/tests/telemetry_test.rs @@ -4,18 +4,6 @@ use apollo_router::__create_test_service_factory_from_yaml; // be encountered. (See https://github.com/open-telemetry/opentelemetry-rust/issues/536) #[tokio::test(flavor = "multi_thread")] async fn test_telemetry_doesnt_hang_with_invalid_schema() { - use apollo_router::subscriber::set_global_subscriber; - use apollo_router::subscriber::RouterSubscriber; - use tracing_subscriber::EnvFilter; - - // A global subscriber must be set before we start up the telemetry plugin - let _ = set_global_subscriber(RouterSubscriber::JsonSubscriber( - tracing_subscriber::fmt::fmt() - .with_env_filter(EnvFilter::from_default_env()) - .json() - .finish(), - )); - __create_test_service_factory_from_yaml( include_str!("../src/testdata/invalid_supergraph.graphql"), r#" diff --git a/examples/embedded/Cargo.toml b/examples/embedded/Cargo.toml index a5222329b6..380bca6382 100644 --- a/examples/embedded/Cargo.toml +++ b/examples/embedded/Cargo.toml @@ -12,4 +12,3 @@ futures = "0.3" serde_json = "1" tokio = { version = "1", features = ["full"] } tower = "0.4" -tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] } diff --git a/examples/embedded/src/main.rs b/examples/embedded/src/main.rs index 8969cd1d20..f97aed7b0d 100644 --- a/examples/embedded/src/main.rs +++ b/examples/embedded/src/main.rs @@ -5,19 +5,10 @@ use anyhow::Result; use apollo_router::services::PluggableRouterServiceBuilder; use apollo_router::services::RouterRequest; use apollo_router::services::SubgraphService; -use apollo_router::subscriber::set_global_subscriber; -use apollo_router::subscriber::RouterSubscriber; use tower::ServiceExt; -use tracing_subscriber::EnvFilter; #[tokio::main] async fn main() -> Result<()> { - // set up console logs - let builder = tracing_subscriber::fmt::fmt() - .with_env_filter(EnvFilter::try_new("info").expect("could not parse log")); - - set_global_subscriber(RouterSubscriber::TextSubscriber(builder.finish()))?; - // get the supergraph from ../../examples/graphql/supergraph.graphql let schema = include_str!("../../graphql/supergraph.graphql"); let schema = Arc::new(apollo_router::Schema::parse(schema, &Default::default())?);