From af7dd531deabbc6efab80c1188faf8fb6cafc482 Mon Sep 17 00:00:00 2001 From: Nicolas Savoire Date: Mon, 11 Apr 2022 08:15:00 +0000 Subject: [PATCH] Tentative fix --- ddprof-exporter/src/connector.rs | 83 +++++++++++++++++--------------- ddprof-exporter/src/lib.rs | 4 +- 2 files changed, 45 insertions(+), 42 deletions(-) diff --git a/ddprof-exporter/src/connector.rs b/ddprof-exporter/src/connector.rs index 111f450..7aff6ca 100644 --- a/ddprof-exporter/src/connector.rs +++ b/ddprof-exporter/src/connector.rs @@ -63,7 +63,7 @@ pub(crate) mod uds { pin_project! { #[project = ConnStreamProj] - pub(crate) enum ConnStream { + pub enum ConnStream { Tcp{ #[pin] transport: tokio::net::TcpStream }, Tls{ #[pin] transport: tokio_rustls::client::TlsStream}, Udp{ #[pin] transport: tokio::net::UnixStream }, @@ -71,8 +71,8 @@ pub(crate) mod uds { } } -use futures::{TryFutureExt, FutureExt}; use futures::future::MaybeDone; +use futures::{FutureExt, TryFutureExt}; use hyper::client::HttpConnector; use hyper::service::Service; use hyper_rustls::MaybeHttpsStream; @@ -85,7 +85,8 @@ use uds::{ConnStream, ConnStreamProj}; pin_project_lite::pin_project! { #[project = ConnStreamProj] pub(crate) enum ConnStream { - Tcp{ #[pin] transport: hyper_rustls::MaybeHttpsStream }, + Tcp{ #[pin] transport: tokio::net::TcpStream }, + Tls{ #[pin] transport: tokio_rustls::client::TlsStream}, } } @@ -106,16 +107,17 @@ impl MaybeHttpsConnector { fn build_https_connector() -> Option> { let certs = load_root_certs()?; - let client_config = - ClientConfig::builder() + let client_config = ClientConfig::builder() .with_safe_defaults() .with_root_certificates(certs) .with_no_client_auth(); - Some(hyper_rustls::HttpsConnectorBuilder::new() - .with_tls_config(client_config) - .https_or_http() - .enable_http1() - .build()) + Some( + hyper_rustls::HttpsConnectorBuilder::new() + .with_tls_config(client_config) + .https_or_http() + .enable_http1() + .build(), + ) } fn load_root_certs() -> Option { @@ -123,14 +125,11 @@ fn load_root_certs() -> Option { let mut valid_count = 0; let mut invalid_count = 0; - for cert in rustls_native_certs::load_native_certs().ok()? - { + for cert in rustls_native_certs::load_native_certs().ok()? { let cert = rustls::Certificate(cert.0); match roots.add(&cert) { Ok(_) => valid_count += 1, - Err(err) => { - invalid_count += 1 - } + Err(err) => invalid_count += 1, } } if roots.is_empty() { @@ -160,7 +159,8 @@ impl hyper::client::connect::Connection for ConnStream { Self::Tcp { transport } => transport.connected(), Self::Tls { transport } => { let (tcp, tls) = transport.get_ref(); - if tls.alpn_protocol() == Some(b"h2") { // TODO/QUESTION: is it safe, future proof, to implement this ourselves ? + if tls.alpn_protocol() == Some(b"h2") { + // TODO/QUESTION: is it safe, future proof, to implement this ourselves ? tcp.connected().negotiated_h2() } else { tcp.connected() @@ -233,40 +233,42 @@ impl hyper::service::Service for MaybeHttpsConnector { } }), Some("https") => match self { - Self::Http(_) => todo!(),// TODO: return error + Self::Http(_) => todo!(), // TODO: return error Self::Https(c) => { + let fut = c.call(uri); Box::pin(async { - match c.call(uri).await? { + match fut.await? { MaybeHttpsStream::Http(_) => todo!(), - MaybeHttpsStream::Https(t) => { - Ok(ConnStream::Tls { - transport: t - }) - }, + MaybeHttpsStream::Https(t) => Ok(ConnStream::Tls { transport: t }), } }) - }, + } + }, + _ => match self { + Self::Http(c) => { + let fut = c.call(uri); + Box::pin(async { + Ok(ConnStream::Tcp { + transport: fut.await?, + }) + }) + } + Self::Https(c) => { + let fut = c.call(uri); + Box::pin(async { + match fut.await? { + MaybeHttpsStream::Http(t) => Ok(ConnStream::Tcp { transport: t }), + MaybeHttpsStream::Https(t) => Ok(ConnStream::Tls { transport: t }), + } + }) + } }, - _ => { - let fut = async { - let stream = match self { - MaybeHttpsConnector::Http(c) => ConnStream::Tcp { transport: c.call(uri).await? }, - MaybeHttpsConnector::Https(c) => match c.call(uri).await? { - MaybeHttpsStream::Http(t) => ConnStream::Tcp { transport: t}, - MaybeHttpsStream::Https(t) => ConnStream::Tls{ transport: t}, - }, - }; - - Ok(stream) - }; - Box::pin(fut) - } } } fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { match self { - MaybeHttpsConnector::Http(c) => c.poll_ready(cx).map_err(|e| e.into() ), + MaybeHttpsConnector::Http(c) => c.poll_ready(cx).map_err(|e| e.into()), MaybeHttpsConnector::Https(c) => c.poll_ready(cx), } } @@ -280,6 +282,7 @@ mod tests { /// Verify that the Connector type implements the correct bound Connect + Clone /// to be able to use the hyper::Client fn test_hyper_client_from_connector() { - let _: hyper::Client = hyper::Client::builder().build(MaybeHttpsConnector::new()); + let _: hyper::Client = + hyper::Client::builder().build(MaybeHttpsConnector::new()); } } diff --git a/ddprof-exporter/src/lib.rs b/ddprof-exporter/src/lib.rs index 870ac23..87ded6c 100644 --- a/ddprof-exporter/src/lib.rs +++ b/ddprof-exporter/src/lib.rs @@ -23,7 +23,7 @@ pub use connector::uds::socket_path_to_uri; const DURATION_ZERO: std::time::Duration = std::time::Duration::from_millis(0); const DATADOG_CONTAINER_ID_HEADER: &str = "Datadog-Container-ID"; -type HttpClient = hyper::Client; +type HttpClient = hyper::Client; pub struct Exporter { client: HttpClient, @@ -227,7 +227,7 @@ impl Exporter { // Set idle to 0, which prevents the pipe being broken every 2nd request let client = hyper::Client::builder() .pool_max_idle_per_host(0) - .build(connector::Connector::new()); + .build(connector::MaybeHttpsConnector::new()); let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build()?;