From f46a45401d42f6c8b6ab449f7462735a9aea0bfc Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Wed, 11 Dec 2019 19:45:11 -0500 Subject: [PATCH] feat(transport): Add tracing support to server (#175) * feat(transport): Add tracing to server * Add tracing example * Fix duplicate versions of tracing-subscriber --- tonic-examples/Cargo.toml | 14 +++++++ tonic-examples/src/tracing/client.rs | 38 +++++++++++++++++ tonic-examples/src/tracing/server.rs | 51 +++++++++++++++++++++++ tonic-interop/Cargo.toml | 2 +- tonic/Cargo.toml | 2 + tonic/src/transport/server/future copy.rs | 25 +++++++++++ tonic/src/transport/server/future.rs | 25 +++++++++++ tonic/src/transport/server/mod.rs | 50 ++++++++++++++++++---- 8 files changed, 197 insertions(+), 10 deletions(-) create mode 100644 tonic-examples/src/tracing/client.rs create mode 100644 tonic-examples/src/tracing/server.rs create mode 100644 tonic/src/transport/server/future copy.rs create mode 100644 tonic/src/transport/server/future.rs diff --git a/tonic-examples/Cargo.toml b/tonic-examples/Cargo.toml index 615b19440..b50b560a9 100644 --- a/tonic-examples/Cargo.toml +++ b/tonic-examples/Cargo.toml @@ -66,6 +66,14 @@ path = "src/multiplex/client.rs" name = "gcp-client" path = "src/gcp/client.rs" +[[bin]] +name = "tracing-client" +path = "src/tracing/client.rs" + +[[bin]] +name = "tracing-server" +path = "src/tracing/server.rs" + [dependencies] tonic = { path = "../tonic", features = ["tls"] } bytes = "0.4" @@ -82,6 +90,12 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" rand = "0.6" +# Tracing +tracing = "0.1" +tracing-subscriber = { version = "0.2.0-alpha", features = ["tracing-log"] } +tracing-attributes = "0.1" +tracing-futures = "0.2" + # Required for wellknown types prost-types = "0.5" diff --git a/tonic-examples/src/tracing/client.rs b/tonic-examples/src/tracing/client.rs new file mode 100644 index 000000000..825c4a3a3 --- /dev/null +++ b/tonic-examples/src/tracing/client.rs @@ -0,0 +1,38 @@ +pub mod hello_world { + tonic::include_proto!("helloworld"); +} + +use hello_world::{greeter_client::GreeterClient, HelloRequest}; +use tracing_attributes::instrument; + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::FmtSubscriber::builder() + .with_max_level(tracing::Level::DEBUG) + .init(); + + say_hi("Bob".into()).await?; + + Ok(()) +} + +#[instrument] +async fn say_hi(name: String) -> Result<(), Box> { + let mut client = GreeterClient::connect("http://[::1]:50051").await?; + + let request = tonic::Request::new(HelloRequest { name }); + + tracing::info!( + message = "Sending request.", + request = %request.get_ref().name + ); + + let response = client.say_hello(request).await?; + + tracing::info!( + message = "Got a response.", + response = %response.get_ref().message + ); + + Ok(()) +} diff --git a/tonic-examples/src/tracing/server.rs b/tonic-examples/src/tracing/server.rs new file mode 100644 index 000000000..cc078d75d --- /dev/null +++ b/tonic-examples/src/tracing/server.rs @@ -0,0 +1,51 @@ +use tonic::{transport::Server, Request, Response, Status}; + +pub mod hello_world { + tonic::include_proto!("helloworld"); +} + +use hello_world::{ + greeter_server::{Greeter, GreeterServer}, + HelloReply, HelloRequest, +}; + +#[derive(Default)] +pub struct MyGreeter {} + +#[tonic::async_trait] +impl Greeter for MyGreeter { + async fn say_hello( + &self, + request: Request, + ) -> Result, Status> { + tracing::info!(message = "Inbound request.", metadata = ?request.metadata()); + + let reply = hello_world::HelloReply { + message: format!("Hello {}!", request.into_inner().name).into(), + }; + + tracing::debug!(message = "Sending reply.", response = %reply.message); + + Ok(Response::new(reply)) + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::FmtSubscriber::builder() + .with_max_level(tracing::Level::DEBUG) + .init(); + + let addr = "[::1]:50051".parse().unwrap(); + let greeter = MyGreeter::default(); + + tracing::info!(message = "Starting server.", %addr); + + Server::builder() + .trace_fn(|_| tracing::info_span!("helloworld_server")) + .add_service(GreeterServer::new(greeter)) + .serve(addr) + .await?; + + Ok(()) +} diff --git a/tonic-interop/Cargo.toml b/tonic-interop/Cargo.toml index 1c4af9b0a..e364041b7 100644 --- a/tonic-interop/Cargo.toml +++ b/tonic-interop/Cargo.toml @@ -32,7 +32,7 @@ console = "0.9" structopt = "0.2" tracing = "0.1" -tracing-subscriber = "0.1.3" +tracing-subscriber = "0.2.0-alpha" tracing-log = "0.1.0" [build-dependencies] diff --git a/tonic/Cargo.toml b/tonic/Cargo.toml index 8f4e35eae..bfce4dab8 100644 --- a/tonic/Cargo.toml +++ b/tonic/Cargo.toml @@ -31,6 +31,7 @@ transport = [ "tower", "tower-balance", "tower-load", + "tracing-futures", ] tls = ["tokio-rustls"] tls-roots = ["rustls-native-certs"] @@ -68,6 +69,7 @@ tower = { git = "https://github.com/tower-rs/tower", optional = true} tower-make = { version = "0.3", features = ["connect"] } tower-balance = { git = "https://github.com/tower-rs/tower", optional = true } tower-load = { git = "https://github.com/tower-rs/tower", optional = true } +tracing-futures = { version = "0.2", optional = true } # rustls tokio-rustls = { version = "0.12", optional = true } diff --git a/tonic/src/transport/server/future copy.rs b/tonic/src/transport/server/future copy.rs new file mode 100644 index 000000000..9cbcdc3b9 --- /dev/null +++ b/tonic/src/transport/server/future copy.rs @@ -0,0 +1,25 @@ +use pin_project::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +#[pin_project] +pub struct ResponseFuture { + inner: F, + span: Option, +} + +impl Future for ResponseFuture { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let me = self.project(); + + if let Some(span) = me.span.clone().take() { + let _enter = span.enter(); + // me.poll(cx).map_err(Into::into) + } + } +} diff --git a/tonic/src/transport/server/future.rs b/tonic/src/transport/server/future.rs new file mode 100644 index 000000000..9cbcdc3b9 --- /dev/null +++ b/tonic/src/transport/server/future.rs @@ -0,0 +1,25 @@ +use pin_project::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +#[pin_project] +pub struct ResponseFuture { + inner: F, + span: Option, +} + +impl Future for ResponseFuture { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let me = self.project(); + + if let Some(span) = me.span.clone().take() { + let _enter = span.enter(); + // me.poll(cx).map_err(Into::into) + } + } +} diff --git a/tonic/src/transport/server/mod.rs b/tonic/src/transport/server/mod.rs index 1f4ec8705..b2c89c186 100644 --- a/tonic/src/transport/server/mod.rs +++ b/tonic/src/transport/server/mod.rs @@ -15,7 +15,7 @@ use futures_util::{ future::{self, poll_fn, MapErr}, TryFutureExt, }; -use http::{Request, Response}; +use http::{HeaderMap, Request, Response}; use hyper::{ server::{accept::Accept, conn}, Body, @@ -39,9 +39,11 @@ use tower::{ }; #[cfg(feature = "tls")] use tracing::error; +use tracing_futures::{Instrument, Instrumented}; type BoxService = tower::util::BoxService, Response, crate::Error>; type Interceptor = Arc + Send + Sync + 'static>; +type TraceInterceptor = Arc tracing::Span + Send + Sync + 'static>; /// A default batteries included `transport` server. /// @@ -54,6 +56,7 @@ type Interceptor = Arc + Send + Sync #[derive(Default, Clone)] pub struct Server { interceptor: Option, + trace_interceptor: Option, concurrency_limit: Option, // timeout: Option, #[cfg(feature = "tls")] @@ -211,6 +214,17 @@ impl Server { } } + /// Intercept inbound headers and add a [`tracing::Span`] to each response future. + pub fn trace_fn(self, f: F) -> Self + where + F: Fn(&HeaderMap) -> tracing::Span + Send + Sync + 'static, + { + Server { + trace_interceptor: Some(Arc::new(f)), + ..self + } + } + /// Create a router with the `S` typed service as the first service. /// /// This will clone the `Server` builder and create a router that will @@ -241,6 +255,7 @@ impl Server { F: Future, { let interceptor = self.interceptor.clone(); + let span = self.trace_interceptor.clone(); let concurrency_limit = self.concurrency_limit; let init_connection_window_size = self.init_connection_window_size; let init_stream_window_size = self.init_stream_window_size; @@ -282,6 +297,7 @@ impl Server { interceptor, concurrency_limit, // timeout, + span, }; let server = hyper::Server::builder(incoming) @@ -399,8 +415,10 @@ impl fmt::Debug for Server { } } -#[derive(Debug)] -struct Svc(S); +struct Svc { + inner: S, + span: Option, +} impl Service> for Svc where @@ -409,14 +427,26 @@ where { type Response = Response; type Error = crate::Error; - type Future = MapErr crate::Error>; + type Future = MapErr, fn(S::Error) -> crate::Error>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.0.poll_ready(cx).map_err(Into::into) + self.inner.poll_ready(cx).map_err(Into::into) } fn call(&mut self, req: Request) -> Self::Future { - self.0.call(req).map_err(|e| e.into()) + let span = if let Some(trace_interceptor) = &self.span { + trace_interceptor(req.headers()) + } else { + tracing::Span::none() + }; + + self.inner.call(req).instrument(span).map_err(|e| e.into()) + } +} + +impl fmt::Debug for Svc { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Svc").finish() } } @@ -425,6 +455,7 @@ struct MakeSvc { concurrency_limit: Option, // timeout: Option, inner: S, + span: Option, } impl Service for MakeSvc @@ -447,6 +478,7 @@ where let svc = self.inner.clone(); let concurrency_limit = self.concurrency_limit; // let timeout = self.timeout.clone(); + let span = self.span.clone(); Box::pin(async move { let svc = ServiceBuilder::new() @@ -455,10 +487,10 @@ where .service(svc); let svc = if let Some(interceptor) = interceptor { - let layered = interceptor.layer(BoxService::new(Svc(svc))); - BoxService::new(Svc(layered)) + let layered = interceptor.layer(BoxService::new(Svc { inner: svc, span })); + BoxService::new(layered) } else { - BoxService::new(Svc(svc)) + BoxService::new(Svc { inner: svc, span }) }; Ok(svc)