diff --git a/linkerd/app/core/src/proxy/server.rs b/linkerd/app/core/src/proxy/server.rs index 85540cf471..7758abd2e5 100644 --- a/linkerd/app/core/src/proxy/server.rs +++ b/linkerd/app/core/src/proxy/server.rs @@ -6,7 +6,7 @@ use crate::{ http::{ glue::{Body, HyperServerSvc}, h2::Settings as H2Settings, - upgrade, Version as HttpVersion, + trace, upgrade, Version as HttpVersion, }, }, svc::{NewService, Service, ServiceExt}, @@ -89,7 +89,7 @@ where H: NewService, H::Service: Service, Response = http::Response>, { - http: hyper::server::conn::Http, + http: hyper::server::conn::Http, h2_settings: H2Settings, transport_labels: L, transport_metrics: transport::Metrics, @@ -115,7 +115,7 @@ where drain: drain::Watch, ) -> Self { Self { - http: hyper::server::conn::Http::new(), + http: hyper::server::conn::Http::new().with_executor(trace::Executor::new()), h2_settings, transport_labels, transport_metrics, @@ -199,7 +199,7 @@ where .http1_only(true) .serve_connection(io, HyperServerSvc::new(svc)) .with_upgrades() - .instrument(info_span!("h2")); + .instrument(info_span!("h1")); Ok(Box::pin(async move { drain diff --git a/linkerd/app/integration/src/client.rs b/linkerd/app/integration/src/client.rs index 6800ca811b..1c3024a93f 100644 --- a/linkerd/app/integration/src/client.rs +++ b/linkerd/app/integration/src/client.rs @@ -1,4 +1,5 @@ use super::*; +use linkerd2_app_core::proxy::http::trace; use rustls::ClientConfig; use std::io; use std::sync::Arc; @@ -237,11 +238,11 @@ fn run(addr: SocketAddr, version: Run, tls: Option) -> (Sender, Runni }; let span = info_span!("test client", peer_addr = %addr, ?version, test = %test_name); - let client = hyper::Client::builder() - .http2_only(http2_only) - .build::(conn); - let work = async move { + let client = hyper::Client::builder() + .http2_only(http2_only) + .executor(trace::Executor::new()) + .build::(conn); tracing::trace!("client task started"); let mut rx = rx; while let Some((req, cb)) = rx.recv().await { diff --git a/linkerd/app/integration/src/server.rs b/linkerd/app/integration/src/server.rs index 52628a2e23..5b89434bf3 100644 --- a/linkerd/app/integration/src/server.rs +++ b/linkerd/app/integration/src/server.rs @@ -1,6 +1,7 @@ use super::*; use futures::TryFuture; use http::Response; +use linkerd2_app_core::proxy::http::trace; use rustls::ServerConfig; use std::collections::HashMap; use std::future::Future; @@ -186,7 +187,8 @@ impl Server { let _subscriber = subscriber.set_default(); tracing::info!("support server running"); let mut new_svc = NewSvc(Arc::new(self.routes)); - let mut http = hyper::server::conn::Http::new(); + let mut http = + hyper::server::conn::Http::new().with_executor(trace::Executor::new()); match self.version { Run::Http1 => http.http1_only(true), Run::Http2 => http.http2_only(true), diff --git a/linkerd/proxy/http/src/client.rs b/linkerd/proxy/http/src/client.rs index e5f14c1691..8231e8a087 100644 --- a/linkerd/proxy/http/src/client.rs +++ b/linkerd/proxy/http/src/client.rs @@ -3,6 +3,7 @@ use super::upgrade::{Http11Upgrade, HttpConnect}; use super::{ h1, h2, settings::{HasSettings, Settings}, + trace, }; use futures::{ready, TryFuture}; use http; @@ -145,6 +146,7 @@ where } // hyper should only try to automatically // set the host if the request was in absolute_form + .executor(trace::Executor::new()) .set_host(was_absolute_form) .build(HyperConnect::new(connect, target, was_absolute_form)); MakeFuture::Http1(Some(h1)) diff --git a/linkerd/proxy/http/src/h2.rs b/linkerd/proxy/http/src/h2.rs index e7ebc8d0cc..8fa6ce7bcd 100644 --- a/linkerd/proxy/http/src/h2.rs +++ b/linkerd/proxy/http/src/h2.rs @@ -1,3 +1,4 @@ +use super::trace; use super::Body; use futures::{ready, TryFuture, TryFutureExt}; use http; @@ -156,6 +157,7 @@ where .http2_initial_connection_window_size( this.h2_settings.initial_connection_window_size, ) + .executor(trace::Executor::new()) .handshake(io) .instrument(info_span!("h2")); diff --git a/linkerd/proxy/http/src/lib.rs b/linkerd/proxy/http/src/lib.rs index 4be1b21971..bdc5253360 100644 --- a/linkerd/proxy/http/src/lib.rs +++ b/linkerd/proxy/http/src/lib.rs @@ -19,6 +19,7 @@ pub mod override_authority; pub mod settings; pub mod strip_header; pub mod timeout; +pub mod trace; pub mod upgrade; mod version; diff --git a/linkerd/proxy/http/src/trace.rs b/linkerd/proxy/http/src/trace.rs new file mode 100644 index 0000000000..bde36dde99 --- /dev/null +++ b/linkerd/proxy/http/src/trace.rs @@ -0,0 +1,25 @@ +use std::future::Future; +use tracing_futures::Instrument; + +#[derive(Clone, Debug)] +pub struct Executor { + _p: (), +} + +impl Executor { + #[inline] + pub fn new() -> Self { + Self { _p: () } + } +} + +impl hyper::rt::Executor for Executor +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + #[inline] + fn execute(&self, f: F) { + tokio::spawn(f.in_current_span()); + } +} diff --git a/linkerd/proxy/tap/src/accept.rs b/linkerd/proxy/tap/src/accept.rs index 028fe95033..208941ec40 100644 --- a/linkerd/proxy/tap/src/accept.rs +++ b/linkerd/proxy/tap/src/accept.rs @@ -5,7 +5,7 @@ use linkerd2_identity as identity; use linkerd2_proxy_api::tap::server::{Tap, TapServer}; use linkerd2_proxy_http::{ grpc::{req_box_body, res_body_as_payload}, - HyperServerSvc, + trace, HyperServerSvc, }; use linkerd2_proxy_transport::io::BoxedIo; use linkerd2_proxy_transport::tls::{ @@ -39,10 +39,10 @@ impl AcceptPermittedClients { let svc = res_body_as_payload::Service::new(req_box_body::Service::new(TapServer::new(tap))); - // TODO do we need to set a contextual tracing executor on Hyper? ServeFuture(Box::new( hyper::server::conn::Http::new() .http2_only(true) + .with_executor(trace::Executor::new()) .serve_connection(io, HyperServerSvc::new(svc)) .map_err(Into::into), ))