Skip to content

Commit

Permalink
propagate spans to tasks spawned by Hyper (#550)
Browse files Browse the repository at this point in the history
Previously, in the `futures` 0.1 version of the proxy, we provided
`hyper` with a custom executor to propagate `tracing` spans to tasks
spawned by `hyper`. In the process of updating the proxy to
`std::future`, this was removed, meaning that any tasks spawned by hyper
are missing their spans. This makes events recorded by `h2` and so on
hard to debug, especially when many connections are open.

This branch fixes this by adding an implementation of `hyper::Executor`
that propagates the current span when spawning a task.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw authored Jun 8, 2020
1 parent 3d56fdf commit 6105918
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 11 deletions.
8 changes: 4 additions & 4 deletions linkerd/app/core/src/proxy/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
http::{
glue::{Body, HyperServerSvc},
h2::Settings as H2Settings,
upgrade, Version as HttpVersion,
trace, upgrade, Version as HttpVersion,
},
},
svc::{NewService, Service, ServiceExt},
Expand Down Expand Up @@ -89,7 +89,7 @@ where
H: NewService<tls::accept::Meta>,
H::Service: Service<http::Request<Body>, Response = http::Response<B>>,
{
http: hyper::server::conn::Http,
http: hyper::server::conn::Http<trace::Executor>,
h2_settings: H2Settings,
transport_labels: L,
transport_metrics: transport::Metrics,
Expand All @@ -115,7 +115,7 @@ where
drain: drain::Watch,
) -> Self {
Self {
http: hyper::server::conn::Http::new(),
http: hyper::server::conn::Http::new().with_executor(trace::Executor::new()),
h2_settings,
transport_labels,
transport_metrics,
Expand Down Expand Up @@ -199,7 +199,7 @@ where
.http1_only(true)
.serve_connection(io, HyperServerSvc::new(svc))
.with_upgrades()
.instrument(info_span!("h2"));
.instrument(info_span!("h1"));

Ok(Box::pin(async move {
drain
Expand Down
9 changes: 5 additions & 4 deletions linkerd/app/integration/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::*;
use linkerd2_app_core::proxy::http::trace;
use rustls::ClientConfig;
use std::io;
use std::sync::Arc;
Expand Down Expand Up @@ -237,11 +238,11 @@ fn run(addr: SocketAddr, version: Run, tls: Option<TlsConfig>) -> (Sender, Runni
};

let span = info_span!("test client", peer_addr = %addr, ?version, test = %test_name);
let client = hyper::Client::builder()
.http2_only(http2_only)
.build::<Conn, hyper::Body>(conn);

let work = async move {
let client = hyper::Client::builder()
.http2_only(http2_only)
.executor(trace::Executor::new())
.build::<Conn, hyper::Body>(conn);
tracing::trace!("client task started");
let mut rx = rx;
while let Some((req, cb)) = rx.recv().await {
Expand Down
4 changes: 3 additions & 1 deletion linkerd/app/integration/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::*;
use futures::TryFuture;
use http::Response;
use linkerd2_app_core::proxy::http::trace;
use rustls::ServerConfig;
use std::collections::HashMap;
use std::future::Future;
Expand Down Expand Up @@ -186,7 +187,8 @@ impl Server {
let _subscriber = subscriber.set_default();
tracing::info!("support server running");
let mut new_svc = NewSvc(Arc::new(self.routes));
let mut http = hyper::server::conn::Http::new();
let mut http =
hyper::server::conn::Http::new().with_executor(trace::Executor::new());
match self.version {
Run::Http1 => http.http1_only(true),
Run::Http2 => http.http2_only(true),
Expand Down
2 changes: 2 additions & 0 deletions linkerd/proxy/http/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use super::upgrade::{Http11Upgrade, HttpConnect};
use super::{
h1, h2,
settings::{HasSettings, Settings},
trace,
};
use futures::{ready, TryFuture};
use http;
Expand Down Expand Up @@ -145,6 +146,7 @@ where
}
// hyper should only try to automatically
// set the host if the request was in absolute_form
.executor(trace::Executor::new())
.set_host(was_absolute_form)
.build(HyperConnect::new(connect, target, was_absolute_form));
MakeFuture::Http1(Some(h1))
Expand Down
2 changes: 2 additions & 0 deletions linkerd/proxy/http/src/h2.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::trace;
use super::Body;
use futures::{ready, TryFuture, TryFutureExt};
use http;
Expand Down Expand Up @@ -156,6 +157,7 @@ where
.http2_initial_connection_window_size(
this.h2_settings.initial_connection_window_size,
)
.executor(trace::Executor::new())
.handshake(io)
.instrument(info_span!("h2"));

Expand Down
1 change: 1 addition & 0 deletions linkerd/proxy/http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod override_authority;
pub mod settings;
pub mod strip_header;
pub mod timeout;
pub mod trace;
pub mod upgrade;
mod version;

Expand Down
25 changes: 25 additions & 0 deletions linkerd/proxy/http/src/trace.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use std::future::Future;
use tracing_futures::Instrument;

#[derive(Clone, Debug)]
pub struct Executor {
_p: (),
}

impl Executor {
#[inline]
pub fn new() -> Self {
Self { _p: () }
}
}

impl<F> hyper::rt::Executor<F> for Executor
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
#[inline]
fn execute(&self, f: F) {
tokio::spawn(f.in_current_span());
}
}
4 changes: 2 additions & 2 deletions linkerd/proxy/tap/src/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use linkerd2_identity as identity;
use linkerd2_proxy_api::tap::server::{Tap, TapServer};
use linkerd2_proxy_http::{
grpc::{req_box_body, res_body_as_payload},
HyperServerSvc,
trace, HyperServerSvc,
};
use linkerd2_proxy_transport::io::BoxedIo;
use linkerd2_proxy_transport::tls::{
Expand Down Expand Up @@ -39,10 +39,10 @@ impl AcceptPermittedClients {
let svc =
res_body_as_payload::Service::new(req_box_body::Service::new(TapServer::new(tap)));

// TODO do we need to set a contextual tracing executor on Hyper?
ServeFuture(Box::new(
hyper::server::conn::Http::new()
.http2_only(true)
.with_executor(trace::Executor::new())
.serve_connection(io, HyperServerSvc::new(svc))
.map_err(Into::into),
))
Expand Down

0 comments on commit 6105918

Please sign in to comment.