Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(transport): Add tracing support to server #175

Merged
merged 3 commits into from
Dec 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions tonic-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ path = "src/multiplex/client.rs"
name = "gcp-client"
path = "src/gcp/client.rs"

[[bin]]
name = "tracing-client"
path = "src/tracing/client.rs"

[[bin]]
name = "tracing-server"
path = "src/tracing/server.rs"

[dependencies]
tonic = { path = "../tonic", features = ["tls"] }
bytes = "0.4"
Expand All @@ -82,6 +90,12 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
rand = "0.6"

# Tracing
tracing = "0.1"
tracing-subscriber = { version = "0.2.0-alpha", features = ["tracing-log"] }
tracing-attributes = "0.1"
tracing-futures = "0.2"

# Required for wellknown types
prost-types = "0.5"

Expand Down
38 changes: 38 additions & 0 deletions tonic-examples/src/tracing/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
pub mod hello_world {
tonic::include_proto!("helloworld");
}

use hello_world::{greeter_client::GreeterClient, HelloRequest};
use tracing_attributes::instrument;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::DEBUG)
.init();

say_hi("Bob".into()).await?;

Ok(())
}

#[instrument]
async fn say_hi(name: String) -> Result<(), Box<dyn std::error::Error>> {
let mut client = GreeterClient::connect("http://[::1]:50051").await?;

let request = tonic::Request::new(HelloRequest { name });

tracing::info!(
message = "Sending request.",
request = %request.get_ref().name
);

let response = client.say_hello(request).await?;

tracing::info!(
message = "Got a response.",
response = %response.get_ref().message
);

Ok(())
}
51 changes: 51 additions & 0 deletions tonic-examples/src/tracing/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use tonic::{transport::Server, Request, Response, Status};

pub mod hello_world {
tonic::include_proto!("helloworld");
}

use hello_world::{
greeter_server::{Greeter, GreeterServer},
HelloReply, HelloRequest,
};

#[derive(Default)]
pub struct MyGreeter {}

#[tonic::async_trait]
impl Greeter for MyGreeter {
async fn say_hello(
&self,
request: Request<HelloRequest>,
) -> Result<Response<HelloReply>, Status> {
tracing::info!(message = "Inbound request.", metadata = ?request.metadata());

let reply = hello_world::HelloReply {
message: format!("Hello {}!", request.into_inner().name).into(),
};

tracing::debug!(message = "Sending reply.", response = %reply.message);

Ok(Response::new(reply))
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::DEBUG)
.init();

let addr = "[::1]:50051".parse().unwrap();
let greeter = MyGreeter::default();

tracing::info!(message = "Starting server.", %addr);

Server::builder()
.trace_fn(|_| tracing::info_span!("helloworld_server"))
.add_service(GreeterServer::new(greeter))
.serve(addr)
.await?;

Ok(())
}
2 changes: 1 addition & 1 deletion tonic-interop/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ console = "0.9"
structopt = "0.2"

tracing = "0.1"
tracing-subscriber = "0.1.3"
tracing-subscriber = "0.2.0-alpha"
tracing-log = "0.1.0"

[build-dependencies]
Expand Down
2 changes: 2 additions & 0 deletions tonic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ transport = [
"tower",
"tower-balance",
"tower-load",
"tracing-futures",
]
tls = ["tokio-rustls"]
tls-roots = ["rustls-native-certs"]
Expand Down Expand Up @@ -68,6 +69,7 @@ tower = { git = "https://github.com/tower-rs/tower", optional = true}
tower-make = { version = "0.3", features = ["connect"] }
tower-balance = { git = "https://github.com/tower-rs/tower", optional = true }
tower-load = { git = "https://github.com/tower-rs/tower", optional = true }
tracing-futures = { version = "0.2", optional = true }

# rustls
tokio-rustls = { version = "0.12", optional = true }
Expand Down
25 changes: 25 additions & 0 deletions tonic/src/transport/server/future copy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use pin_project::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

#[pin_project]
pub struct ResponseFuture<F> {
inner: F,
span: Option<Span>,
}

impl<F: Future> Future for ResponseFuture<F> {
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();

if let Some(span) = me.span.clone().take() {
let _enter = span.enter();
// me.poll(cx).map_err(Into::into)
}
}
}
25 changes: 25 additions & 0 deletions tonic/src/transport/server/future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use pin_project::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

#[pin_project]
pub struct ResponseFuture<F> {
inner: F,
span: Option<Span>,
}

impl<F: Future> Future for ResponseFuture<F> {
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();

if let Some(span) = me.span.clone().take() {
let _enter = span.enter();
// me.poll(cx).map_err(Into::into)
}
}
}
50 changes: 41 additions & 9 deletions tonic/src/transport/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use futures_util::{
future::{self, poll_fn, MapErr},
TryFutureExt,
};
use http::{Request, Response};
use http::{HeaderMap, Request, Response};
use hyper::{
server::{accept::Accept, conn},
Body,
Expand All @@ -39,9 +39,11 @@ use tower::{
};
#[cfg(feature = "tls")]
use tracing::error;
use tracing_futures::{Instrument, Instrumented};

type BoxService = tower::util::BoxService<Request<Body>, Response<BoxBody>, crate::Error>;
type Interceptor = Arc<dyn Layer<BoxService, Service = BoxService> + Send + Sync + 'static>;
type TraceInterceptor = Arc<dyn Fn(&HeaderMap) -> tracing::Span + Send + Sync + 'static>;

/// A default batteries included `transport` server.
///
Expand All @@ -54,6 +56,7 @@ type Interceptor = Arc<dyn Layer<BoxService, Service = BoxService> + Send + Sync
#[derive(Default, Clone)]
pub struct Server {
interceptor: Option<Interceptor>,
trace_interceptor: Option<TraceInterceptor>,
concurrency_limit: Option<usize>,
// timeout: Option<Duration>,
#[cfg(feature = "tls")]
Expand Down Expand Up @@ -211,6 +214,17 @@ impl Server {
}
}

/// Intercept inbound headers and add a [`tracing::Span`] to each response future.
pub fn trace_fn<F>(self, f: F) -> Self
where
F: Fn(&HeaderMap) -> tracing::Span + Send + Sync + 'static,
{
Server {
trace_interceptor: Some(Arc::new(f)),
..self
}
}

/// Create a router with the `S` typed service as the first service.
///
/// This will clone the `Server` builder and create a router that will
Expand Down Expand Up @@ -241,6 +255,7 @@ impl Server {
F: Future<Output = ()>,
{
let interceptor = self.interceptor.clone();
let span = self.trace_interceptor.clone();
let concurrency_limit = self.concurrency_limit;
let init_connection_window_size = self.init_connection_window_size;
let init_stream_window_size = self.init_stream_window_size;
Expand Down Expand Up @@ -282,6 +297,7 @@ impl Server {
interceptor,
concurrency_limit,
// timeout,
span,
};

let server = hyper::Server::builder(incoming)
Expand Down Expand Up @@ -399,8 +415,10 @@ impl fmt::Debug for Server {
}
}

#[derive(Debug)]
struct Svc<S>(S);
struct Svc<S> {
inner: S,
span: Option<TraceInterceptor>,
}

impl<S> Service<Request<Body>> for Svc<S>
where
Expand All @@ -409,14 +427,26 @@ where
{
type Response = Response<BoxBody>;
type Error = crate::Error;
type Future = MapErr<S::Future, fn(S::Error) -> crate::Error>;
type Future = MapErr<Instrumented<S::Future>, fn(S::Error) -> crate::Error>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx).map_err(Into::into)
self.inner.poll_ready(cx).map_err(Into::into)
}

fn call(&mut self, req: Request<Body>) -> Self::Future {
self.0.call(req).map_err(|e| e.into())
let span = if let Some(trace_interceptor) = &self.span {
trace_interceptor(req.headers())
} else {
tracing::Span::none()
};

self.inner.call(req).instrument(span).map_err(|e| e.into())
}
}

impl<S> fmt::Debug for Svc<S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Svc").finish()
}
}

Expand All @@ -425,6 +455,7 @@ struct MakeSvc<S> {
concurrency_limit: Option<usize>,
// timeout: Option<Duration>,
inner: S,
span: Option<TraceInterceptor>,
}

impl<S, T> Service<T> for MakeSvc<S>
Expand All @@ -447,6 +478,7 @@ where
let svc = self.inner.clone();
let concurrency_limit = self.concurrency_limit;
// let timeout = self.timeout.clone();
let span = self.span.clone();

Box::pin(async move {
let svc = ServiceBuilder::new()
Expand All @@ -455,10 +487,10 @@ where
.service(svc);

let svc = if let Some(interceptor) = interceptor {
let layered = interceptor.layer(BoxService::new(Svc(svc)));
BoxService::new(Svc(layered))
let layered = interceptor.layer(BoxService::new(Svc { inner: svc, span }));
BoxService::new(layered)
} else {
BoxService::new(Svc(svc))
BoxService::new(Svc { inner: svc, span })
};

Ok(svc)
Expand Down