From 13b2098b71f6a2508a1d1533526a08928b6125a3 Mon Sep 17 00:00:00 2001 From: Pawel Chojnacki Date: Tue, 5 Apr 2022 14:18:49 +0200 Subject: [PATCH 01/13] Proposal for refactoring of https connector to make root certificates optional --- ddprof-exporter/Cargo.toml | 3 + ddprof-exporter/src/connector.rs | 115 ++++++++++++++++++++++++++----- 2 files changed, 99 insertions(+), 19 deletions(-) diff --git a/ddprof-exporter/Cargo.toml b/ddprof-exporter/Cargo.toml index d5a45e4..eff17c7 100644 --- a/ddprof-exporter/Cargo.toml +++ b/ddprof-exporter/Cargo.toml @@ -20,12 +20,15 @@ libc = "0.2" regex = "1.5" hyper = { version = "0.14", features = ["http1", "client", "tcp", "stream"], default-features = false } tokio = { version = "1.8", features = ["rt"]} +tokio-rustls = { version = "0.23" } percent-encoding = "2.1" futures-core = { version = "0.3.0", default-features = false } futures-util = { version = "0.3.0", default-features = false } mime_guess = { version = "2.0", default-features = false } http-body = "0.4" pin-project-lite = "0.2.0" +rustls = { version = "0.20.4", default-features = false } +rustls-native-certs = { version = "0.6" } hyper-rustls = { version = "0.23", default-features = false, features = ["native-tokio", "http1", "tls12"] } hex = "0.4" hyper-multipart-rfc7578 = { git = "https://github.com/paullegranddc/rust-multipart-rfc7578.git", rev = "8dcedc266e50876c04c91d24390fe9ac44f10b96" } diff --git a/ddprof-exporter/src/connector.rs b/ddprof-exporter/src/connector.rs index 5b2bd62..111f450 100644 --- a/ddprof-exporter/src/connector.rs +++ b/ddprof-exporter/src/connector.rs @@ -64,12 +64,20 @@ pub(crate) mod uds { pin_project! { #[project = ConnStreamProj] pub(crate) enum ConnStream { - Tcp{ #[pin] transport: hyper_rustls::MaybeHttpsStream }, + Tcp{ #[pin] transport: tokio::net::TcpStream }, + Tls{ #[pin] transport: tokio_rustls::client::TlsStream}, Udp{ #[pin] transport: tokio::net::UnixStream }, } } } +use futures::{TryFutureExt, FutureExt}; +use futures::future::MaybeDone; +use hyper::client::HttpConnector; +use hyper::service::Service; +use hyper_rustls::MaybeHttpsStream; +use rustls::ClientConfig; + #[cfg(unix)] use uds::{ConnStream, ConnStreamProj}; @@ -82,22 +90,55 @@ pin_project_lite::pin_project! { } #[derive(Clone)] -pub(crate) struct Connector { - tcp: hyper_rustls::HttpsConnector, +pub enum MaybeHttpsConnector { + Http(hyper::client::HttpConnector), + Https(hyper_rustls::HttpsConnector), } -impl Connector { +impl MaybeHttpsConnector { pub(crate) fn new() -> Self { - Self { - tcp: hyper_rustls::HttpsConnectorBuilder::new() - .with_native_roots() - .https_or_http() - .enable_http1() - .build(), + match build_https_connector() { + Some(connector) => MaybeHttpsConnector::Https(connector), + None => MaybeHttpsConnector::Http(HttpConnector::new()), } } } +fn build_https_connector() -> Option> { + let certs = load_root_certs()?; + let client_config = + ClientConfig::builder() + .with_safe_defaults() + .with_root_certificates(certs) + .with_no_client_auth(); + Some(hyper_rustls::HttpsConnectorBuilder::new() + .with_tls_config(client_config) + .https_or_http() + .enable_http1() + .build()) +} + +fn load_root_certs() -> Option { + let mut roots = rustls::RootCertStore::empty(); + let mut valid_count = 0; + let mut invalid_count = 0; + + for cert in rustls_native_certs::load_native_certs().ok()? + { + let cert = rustls::Certificate(cert.0); + match roots.add(&cert) { + Ok(_) => valid_count += 1, + Err(err) => { + invalid_count += 1 + } + } + } + if roots.is_empty() { + return None; + } + Some(roots) +} + impl tokio::io::AsyncRead for ConnStream { fn poll_read( self: Pin<&mut Self>, @@ -106,6 +147,7 @@ impl tokio::io::AsyncRead for ConnStream { ) -> Poll> { match self.project() { ConnStreamProj::Tcp { transport } => transport.poll_read(cx, buf), + ConnStreamProj::Tls { transport } => transport.poll_read(cx, buf), #[cfg(unix)] ConnStreamProj::Udp { transport } => transport.poll_read(cx, buf), } @@ -116,6 +158,14 @@ impl hyper::client::connect::Connection for ConnStream { fn connected(&self) -> hyper::client::connect::Connected { match self { Self::Tcp { transport } => transport.connected(), + Self::Tls { transport } => { + let (tcp, tls) = transport.get_ref(); + if tls.alpn_protocol() == Some(b"h2") { // TODO/QUESTION: is it safe, future proof, to implement this ourselves ? + tcp.connected().negotiated_h2() + } else { + tcp.connected() + } + } #[cfg(unix)] Self::Udp { transport: _ } => hyper::client::connect::Connected::new(), } @@ -130,6 +180,7 @@ impl tokio::io::AsyncWrite for ConnStream { ) -> Poll> { match self.project() { ConnStreamProj::Tcp { transport } => transport.poll_write(cx, buf), + ConnStreamProj::Tls { transport } => transport.poll_write(cx, buf), #[cfg(unix)] ConnStreamProj::Udp { transport } => transport.poll_write(cx, buf), } @@ -141,6 +192,7 @@ impl tokio::io::AsyncWrite for ConnStream { ) -> Poll> { match self.project() { ConnStreamProj::Tcp { transport } => transport.poll_shutdown(cx), + ConnStreamProj::Tls { transport } => transport.poll_shutdown(cx), #[cfg(unix)] ConnStreamProj::Udp { transport } => transport.poll_shutdown(cx), } @@ -149,13 +201,14 @@ impl tokio::io::AsyncWrite for ConnStream { fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.project() { ConnStreamProj::Tcp { transport } => transport.poll_flush(cx), + ConnStreamProj::Tls { transport } => transport.poll_flush(cx), #[cfg(unix)] ConnStreamProj::Udp { transport } => transport.poll_flush(cx), } } } -impl hyper::service::Service for Connector { +impl hyper::service::Service for MaybeHttpsConnector { type Response = ConnStream; type Error = Box; @@ -179,19 +232,43 @@ impl hyper::service::Service for Connector { Err(crate::errors::Error::UnixSockeUnsuported.into()) } }), - _ => { - let fut = self.tcp.call(uri); - Box::pin(async { - Ok(ConnStream::Tcp { - transport: fut.await?, + Some("https") => match self { + Self::Http(_) => todo!(),// TODO: return error + Self::Https(c) => { + Box::pin(async { + match c.call(uri).await? { + MaybeHttpsStream::Http(_) => todo!(), + MaybeHttpsStream::Https(t) => { + Ok(ConnStream::Tls { + transport: t + }) + }, + } }) - }) + }, + }, + _ => { + let fut = async { + let stream = match self { + MaybeHttpsConnector::Http(c) => ConnStream::Tcp { transport: c.call(uri).await? }, + MaybeHttpsConnector::Https(c) => match c.call(uri).await? { + MaybeHttpsStream::Http(t) => ConnStream::Tcp { transport: t}, + MaybeHttpsStream::Https(t) => ConnStream::Tls{ transport: t}, + }, + }; + + Ok(stream) + }; + Box::pin(fut) } } } fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.tcp.poll_ready(cx) + match self { + MaybeHttpsConnector::Http(c) => c.poll_ready(cx).map_err(|e| e.into() ), + MaybeHttpsConnector::Https(c) => c.poll_ready(cx), + } } } @@ -203,6 +280,6 @@ mod tests { /// Verify that the Connector type implements the correct bound Connect + Clone /// to be able to use the hyper::Client fn test_hyper_client_from_connector() { - let _: hyper::Client = hyper::Client::builder().build(Connector::new()); + let _: hyper::Client = hyper::Client::builder().build(MaybeHttpsConnector::new()); } } From f78daf302d31c56f885d07c2e47c5465d8a1f0de Mon Sep 17 00:00:00 2001 From: r1viollet <74836499+r1viollet@users.noreply.github.com> Date: Mon, 11 Apr 2022 12:39:43 +0200 Subject: [PATCH 02/13] Tentative fix (#48) Co-authored-by: Nicolas Savoire --- ddprof-exporter/src/connector.rs | 83 +++++++++++++++++--------------- ddprof-exporter/src/lib.rs | 4 +- 2 files changed, 45 insertions(+), 42 deletions(-) diff --git a/ddprof-exporter/src/connector.rs b/ddprof-exporter/src/connector.rs index 111f450..7aff6ca 100644 --- a/ddprof-exporter/src/connector.rs +++ b/ddprof-exporter/src/connector.rs @@ -63,7 +63,7 @@ pub(crate) mod uds { pin_project! { #[project = ConnStreamProj] - pub(crate) enum ConnStream { + pub enum ConnStream { Tcp{ #[pin] transport: tokio::net::TcpStream }, Tls{ #[pin] transport: tokio_rustls::client::TlsStream}, Udp{ #[pin] transport: tokio::net::UnixStream }, @@ -71,8 +71,8 @@ pub(crate) mod uds { } } -use futures::{TryFutureExt, FutureExt}; use futures::future::MaybeDone; +use futures::{FutureExt, TryFutureExt}; use hyper::client::HttpConnector; use hyper::service::Service; use hyper_rustls::MaybeHttpsStream; @@ -85,7 +85,8 @@ use uds::{ConnStream, ConnStreamProj}; pin_project_lite::pin_project! { #[project = ConnStreamProj] pub(crate) enum ConnStream { - Tcp{ #[pin] transport: hyper_rustls::MaybeHttpsStream }, + Tcp{ #[pin] transport: tokio::net::TcpStream }, + Tls{ #[pin] transport: tokio_rustls::client::TlsStream}, } } @@ -106,16 +107,17 @@ impl MaybeHttpsConnector { fn build_https_connector() -> Option> { let certs = load_root_certs()?; - let client_config = - ClientConfig::builder() + let client_config = ClientConfig::builder() .with_safe_defaults() .with_root_certificates(certs) .with_no_client_auth(); - Some(hyper_rustls::HttpsConnectorBuilder::new() - .with_tls_config(client_config) - .https_or_http() - .enable_http1() - .build()) + Some( + hyper_rustls::HttpsConnectorBuilder::new() + .with_tls_config(client_config) + .https_or_http() + .enable_http1() + .build(), + ) } fn load_root_certs() -> Option { @@ -123,14 +125,11 @@ fn load_root_certs() -> Option { let mut valid_count = 0; let mut invalid_count = 0; - for cert in rustls_native_certs::load_native_certs().ok()? - { + for cert in rustls_native_certs::load_native_certs().ok()? { let cert = rustls::Certificate(cert.0); match roots.add(&cert) { Ok(_) => valid_count += 1, - Err(err) => { - invalid_count += 1 - } + Err(err) => invalid_count += 1, } } if roots.is_empty() { @@ -160,7 +159,8 @@ impl hyper::client::connect::Connection for ConnStream { Self::Tcp { transport } => transport.connected(), Self::Tls { transport } => { let (tcp, tls) = transport.get_ref(); - if tls.alpn_protocol() == Some(b"h2") { // TODO/QUESTION: is it safe, future proof, to implement this ourselves ? + if tls.alpn_protocol() == Some(b"h2") { + // TODO/QUESTION: is it safe, future proof, to implement this ourselves ? tcp.connected().negotiated_h2() } else { tcp.connected() @@ -233,40 +233,42 @@ impl hyper::service::Service for MaybeHttpsConnector { } }), Some("https") => match self { - Self::Http(_) => todo!(),// TODO: return error + Self::Http(_) => todo!(), // TODO: return error Self::Https(c) => { + let fut = c.call(uri); Box::pin(async { - match c.call(uri).await? { + match fut.await? { MaybeHttpsStream::Http(_) => todo!(), - MaybeHttpsStream::Https(t) => { - Ok(ConnStream::Tls { - transport: t - }) - }, + MaybeHttpsStream::Https(t) => Ok(ConnStream::Tls { transport: t }), } }) - }, + } + }, + _ => match self { + Self::Http(c) => { + let fut = c.call(uri); + Box::pin(async { + Ok(ConnStream::Tcp { + transport: fut.await?, + }) + }) + } + Self::Https(c) => { + let fut = c.call(uri); + Box::pin(async { + match fut.await? { + MaybeHttpsStream::Http(t) => Ok(ConnStream::Tcp { transport: t }), + MaybeHttpsStream::Https(t) => Ok(ConnStream::Tls { transport: t }), + } + }) + } }, - _ => { - let fut = async { - let stream = match self { - MaybeHttpsConnector::Http(c) => ConnStream::Tcp { transport: c.call(uri).await? }, - MaybeHttpsConnector::Https(c) => match c.call(uri).await? { - MaybeHttpsStream::Http(t) => ConnStream::Tcp { transport: t}, - MaybeHttpsStream::Https(t) => ConnStream::Tls{ transport: t}, - }, - }; - - Ok(stream) - }; - Box::pin(fut) - } } } fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { match self { - MaybeHttpsConnector::Http(c) => c.poll_ready(cx).map_err(|e| e.into() ), + MaybeHttpsConnector::Http(c) => c.poll_ready(cx).map_err(|e| e.into()), MaybeHttpsConnector::Https(c) => c.poll_ready(cx), } } @@ -280,6 +282,7 @@ mod tests { /// Verify that the Connector type implements the correct bound Connect + Clone /// to be able to use the hyper::Client fn test_hyper_client_from_connector() { - let _: hyper::Client = hyper::Client::builder().build(MaybeHttpsConnector::new()); + let _: hyper::Client = + hyper::Client::builder().build(MaybeHttpsConnector::new()); } } diff --git a/ddprof-exporter/src/lib.rs b/ddprof-exporter/src/lib.rs index 870ac23..87ded6c 100644 --- a/ddprof-exporter/src/lib.rs +++ b/ddprof-exporter/src/lib.rs @@ -23,7 +23,7 @@ pub use connector::uds::socket_path_to_uri; const DURATION_ZERO: std::time::Duration = std::time::Duration::from_millis(0); const DATADOG_CONTAINER_ID_HEADER: &str = "Datadog-Container-ID"; -type HttpClient = hyper::Client; +type HttpClient = hyper::Client; pub struct Exporter { client: HttpClient, @@ -227,7 +227,7 @@ impl Exporter { // Set idle to 0, which prevents the pipe being broken every 2nd request let client = hyper::Client::builder() .pool_max_idle_per_host(0) - .build(connector::Connector::new()); + .build(connector::MaybeHttpsConnector::new()); let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build()?; From 80266745a1022327551f4c88ce2fd371ae61de5b Mon Sep 17 00:00:00 2001 From: Pawel Chojnacki Date: Mon, 11 Apr 2022 15:38:47 +0200 Subject: [PATCH 03/13] return error when HTTPs cannot be used but is requested --- ddprof-exporter/src/connector.rs | 14 ++++++++------ ddprof-exporter/src/errors.rs | 2 ++ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/ddprof-exporter/src/connector.rs b/ddprof-exporter/src/connector.rs index 7aff6ca..08f5070 100644 --- a/ddprof-exporter/src/connector.rs +++ b/ddprof-exporter/src/connector.rs @@ -71,10 +71,8 @@ pub(crate) mod uds { } } -use futures::future::MaybeDone; -use futures::{FutureExt, TryFutureExt}; +use futures::{future, FutureExt}; use hyper::client::HttpConnector; -use hyper::service::Service; use hyper_rustls::MaybeHttpsStream; use rustls::ClientConfig; @@ -122,7 +120,6 @@ fn build_https_connector() -> Option Option { let mut roots = rustls::RootCertStore::empty(); - let mut valid_count = 0; let mut invalid_count = 0; for cert in rustls_native_certs::load_native_certs().ok()? { @@ -233,12 +230,17 @@ impl hyper::service::Service for MaybeHttpsConnector { } }), Some("https") => match self { - Self::Http(_) => todo!(), // TODO: return error + Self::Http(_) => future::err::( + crate::errors::Error::CannotEstablishTlsConnection.into(), + ) + .boxed(), Self::Https(c) => { let fut = c.call(uri); Box::pin(async { match fut.await? { - MaybeHttpsStream::Http(_) => todo!(), + MaybeHttpsStream::Http(_) => { + Err(crate::errors::Error::CannotEstablishTlsConnection.into()) + } MaybeHttpsStream::Https(t) => Ok(ConnStream::Tls { transport: t }), } }) diff --git a/ddprof-exporter/src/errors.rs b/ddprof-exporter/src/errors.rs index 9fff513..26a4420 100644 --- a/ddprof-exporter/src/errors.rs +++ b/ddprof-exporter/src/errors.rs @@ -7,6 +7,7 @@ pub(crate) enum Error { InvalidUrl, OperationTimedOut, UnixSockeUnsuported, + CannotEstablishTlsConnection, } impl fmt::Display for Error { @@ -15,6 +16,7 @@ impl fmt::Display for Error { Self::InvalidUrl => "invalid url", Self::OperationTimedOut => "operation timed out", Self::UnixSockeUnsuported => "unix sockets unsuported on windows", + Self::CannotEstablishTlsConnection => "cannot establish requested secure TLS connection", }) } } From 6e59f7e7b3f6b4357a2da2b32331ca152d2c50a4 Mon Sep 17 00:00:00 2001 From: Pawel Chojnacki Date: Mon, 11 Apr 2022 15:56:44 +0200 Subject: [PATCH 04/13] Correctly handle error when no root certificates are found --- Cargo.lock | 5 +++++ ddprof-exporter/Cargo.toml | 1 + ddprof-exporter/src/connector.rs | 33 +++++++++++++------------------- ddprof-exporter/src/errors.rs | 2 ++ 4 files changed, 21 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f2d3b44..376ada1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -115,6 +115,7 @@ dependencies = [ name = "ddprof-exporter" version = "0.5.0-rc.1" dependencies = [ + "anyhow", "bytes", "chrono", "futures", @@ -133,7 +134,10 @@ dependencies = [ "percent-encoding", "pin-project-lite", "regex", + "rustls", + "rustls-native-certs", "tokio", + "tokio-rustls", ] [[package]] @@ -739,6 +743,7 @@ version = "0.20.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fbfeb8d0ddb84706bc597a5574ab8912817c52a397f819e5b614e2265206921" dependencies = [ + "log", "ring", "sct", "webpki", diff --git a/ddprof-exporter/Cargo.toml b/ddprof-exporter/Cargo.toml index eff17c7..a44362b 100644 --- a/ddprof-exporter/Cargo.toml +++ b/ddprof-exporter/Cargo.toml @@ -11,6 +11,7 @@ license = "Apache-2.0" crate-type = ["cdylib", "lib"] [dependencies] +anyhow = "1.0" bytes = "1.0" chrono = "0.4" futures = "0.3" diff --git a/ddprof-exporter/src/connector.rs b/ddprof-exporter/src/connector.rs index 08f5070..9e7dee1 100644 --- a/ddprof-exporter/src/connector.rs +++ b/ddprof-exporter/src/connector.rs @@ -97,19 +97,19 @@ pub enum MaybeHttpsConnector { impl MaybeHttpsConnector { pub(crate) fn new() -> Self { match build_https_connector() { - Some(connector) => MaybeHttpsConnector::Https(connector), - None => MaybeHttpsConnector::Http(HttpConnector::new()), + Ok(connector) => MaybeHttpsConnector::Https(connector), + Err(_) => MaybeHttpsConnector::Http(HttpConnector::new()), } } } -fn build_https_connector() -> Option> { +fn build_https_connector() -> anyhow::Result> { let certs = load_root_certs()?; let client_config = ClientConfig::builder() .with_safe_defaults() .with_root_certificates(certs) .with_no_client_auth(); - Some( + Ok( hyper_rustls::HttpsConnectorBuilder::new() .with_tls_config(client_config) .https_or_http() @@ -118,21 +118,19 @@ fn build_https_connector() -> Option Option { +fn load_root_certs() -> anyhow::Result { let mut roots = rustls::RootCertStore::empty(); - let mut invalid_count = 0; - for cert in rustls_native_certs::load_native_certs().ok()? { + for cert in rustls_native_certs::load_native_certs()? { let cert = rustls::Certificate(cert.0); - match roots.add(&cert) { - Ok(_) => valid_count += 1, - Err(err) => invalid_count += 1, - } + + //TODO: log when invalid cert is loaded + roots.add(&cert).ok(); } if roots.is_empty() { - return None; + return Err(crate::errors::Error::NoValidCertifacteRootsFound.into()); } - Some(roots) + Ok(roots) } impl tokio::io::AsyncRead for ConnStream { @@ -155,13 +153,8 @@ impl hyper::client::connect::Connection for ConnStream { match self { Self::Tcp { transport } => transport.connected(), Self::Tls { transport } => { - let (tcp, tls) = transport.get_ref(); - if tls.alpn_protocol() == Some(b"h2") { - // TODO/QUESTION: is it safe, future proof, to implement this ourselves ? - tcp.connected().negotiated_h2() - } else { - tcp.connected() - } + let (tcp, _) = transport.get_ref(); + tcp.connected() } #[cfg(unix)] Self::Udp { transport: _ } => hyper::client::connect::Connected::new(), diff --git a/ddprof-exporter/src/errors.rs b/ddprof-exporter/src/errors.rs index 26a4420..16f43fc 100644 --- a/ddprof-exporter/src/errors.rs +++ b/ddprof-exporter/src/errors.rs @@ -8,6 +8,7 @@ pub(crate) enum Error { OperationTimedOut, UnixSockeUnsuported, CannotEstablishTlsConnection, + NoValidCertifacteRootsFound, } impl fmt::Display for Error { @@ -17,6 +18,7 @@ impl fmt::Display for Error { Self::OperationTimedOut => "operation timed out", Self::UnixSockeUnsuported => "unix sockets unsuported on windows", Self::CannotEstablishTlsConnection => "cannot establish requested secure TLS connection", + Self::NoValidCertifacteRootsFound => "native tls couldn't find any valid certifacte roots" }) } } From 46887c42241f8fcce9c22185cb3faf1074ccb5ba Mon Sep 17 00:00:00 2001 From: Pawel Chojnacki Date: Mon, 11 Apr 2022 15:59:57 +0200 Subject: [PATCH 05/13] Use latest common multipart released package --- Cargo.lock | 10 ++++++---- ddprof-exporter/Cargo.toml | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 376ada1..3f3b723 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -74,8 +74,9 @@ dependencies = [ [[package]] name = "common-multipart-rfc7578" -version = "0.4.2" -source = "git+https://github.com/paullegranddc/rust-multipart-rfc7578.git?rev=8dcedc266e50876c04c91d24390fe9ac44f10b96#8dcedc266e50876c04c91d24390fe9ac44f10b96" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22328b3864f1d8dbe7036f3f2fdfdcb1f367af43dca418943d396fbf8c4b8021" dependencies = [ "bytes", "futures-core", @@ -369,8 +370,9 @@ dependencies = [ [[package]] name = "hyper-multipart-rfc7578" -version = "0.6.2" -source = "git+https://github.com/paullegranddc/rust-multipart-rfc7578.git?rev=8dcedc266e50876c04c91d24390fe9ac44f10b96#8dcedc266e50876c04c91d24390fe9ac44f10b96" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63ca8108ac0ae98d310d41cddb11c6b822e8aca865dbe421366934e6f7f72e10" dependencies = [ "bytes", "common-multipart-rfc7578", diff --git a/ddprof-exporter/Cargo.toml b/ddprof-exporter/Cargo.toml index a44362b..11d2ad7 100644 --- a/ddprof-exporter/Cargo.toml +++ b/ddprof-exporter/Cargo.toml @@ -32,7 +32,7 @@ rustls = { version = "0.20.4", default-features = false } rustls-native-certs = { version = "0.6" } hyper-rustls = { version = "0.23", default-features = false, features = ["native-tokio", "http1", "tls12"] } hex = "0.4" -hyper-multipart-rfc7578 = { git = "https://github.com/paullegranddc/rust-multipart-rfc7578.git", rev = "8dcedc266e50876c04c91d24390fe9ac44f10b96" } +hyper-multipart-rfc7578 = "0.7.0" [dev-dependencies] maplit = "1.0" From 7fa68d27e47681f543d418e64eaad5bf2d0be910 Mon Sep 17 00:00:00 2001 From: Pawel Chojnacki Date: Tue, 12 Apr 2022 10:16:25 +0200 Subject: [PATCH 06/13] Cleanup implementation + add tests --- Cargo.lock | 12 + ddprof-exporter/Cargo.toml | 2 +- ddprof-exporter/src/connector.rs | 283 ------------------- ddprof-exporter/src/connector/conn_stream.rs | 135 +++++++++ ddprof-exporter/src/connector/mod.rs | 146 ++++++++++ ddprof-exporter/src/connector/uds.rs | 52 ++++ ddprof-exporter/src/errors.rs | 10 +- 7 files changed, 353 insertions(+), 287 deletions(-) delete mode 100644 ddprof-exporter/src/connector.rs create mode 100644 ddprof-exporter/src/connector/conn_stream.rs create mode 100644 ddprof-exporter/src/connector/mod.rs create mode 100644 ddprof-exporter/src/connector/uds.rs diff --git a/Cargo.lock b/Cargo.lock index 3f3b723..88af977 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -902,9 +902,21 @@ dependencies = [ "mio", "pin-project-lite", "socket2", + "tokio-macros", "winapi", ] +[[package]] +name = "tokio-macros" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b557f72f448c511a979e2564e55d74e6c4432fc96ff4f6241bc6bded342643b7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tokio-rustls" version = "0.23.2" diff --git a/ddprof-exporter/Cargo.toml b/ddprof-exporter/Cargo.toml index 11d2ad7..92c3977 100644 --- a/ddprof-exporter/Cargo.toml +++ b/ddprof-exporter/Cargo.toml @@ -20,7 +20,7 @@ lazy_static = "1.4" libc = "0.2" regex = "1.5" hyper = { version = "0.14", features = ["http1", "client", "tcp", "stream"], default-features = false } -tokio = { version = "1.8", features = ["rt"]} +tokio = { version = "1.8", features = ["rt", "macros"]} tokio-rustls = { version = "0.23" } percent-encoding = "2.1" futures-core = { version = "0.3.0", default-features = false } diff --git a/ddprof-exporter/src/connector.rs b/ddprof-exporter/src/connector.rs deleted file mode 100644 index 9e7dee1..0000000 --- a/ddprof-exporter/src/connector.rs +++ /dev/null @@ -1,283 +0,0 @@ -use std::error::Error; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; - -// Tokio doesn't handle unix sockets on windows -#[cfg(unix)] -pub(crate) mod uds { - use pin_project_lite::pin_project; - use std::error::Error; - use std::ffi::OsString; - use std::os::unix::ffi::{OsStrExt, OsStringExt}; - use std::path::{Path, PathBuf}; - - /// Creates a new Uri, with the `unix` scheme, and the path to the socket - /// encoded as a hex string, to prevent special characters in the url authority - pub fn socket_path_to_uri(path: &Path) -> Result> { - let path = hex::encode(path.as_os_str().as_bytes()); - Ok(hyper::Uri::builder() - .scheme("unix") - .authority(path) - .path_and_query("") - .build()?) - } - - pub fn socket_path_from_uri( - uri: &hyper::Uri, - ) -> Result> { - if uri.scheme_str() != Some("unix") { - return Err(crate::errors::Error::InvalidUrl.into()); - } - let path = hex::decode( - uri.authority() - .ok_or(crate::errors::Error::InvalidUrl)? - .as_str(), - ) - .map_err(|_| crate::errors::Error::InvalidUrl)?; - Ok(PathBuf::from(OsString::from_vec(path))) - } - - #[test] - fn test_encode_unix_socket_path_absolute() { - let expected_path = "/path/to/a/socket.sock".as_ref(); - let uri = socket_path_to_uri(expected_path).unwrap(); - assert_eq!(uri.scheme_str(), Some("unix")); - - let actual_path = socket_path_from_uri(&uri).unwrap(); - assert_eq!(actual_path.as_path(), Path::new(expected_path)) - } - - #[test] - fn test_encode_unix_socket_relative_path() { - let expected_path = "relative/path/to/a/socket.sock".as_ref(); - let uri = socket_path_to_uri(expected_path).unwrap(); - let actual_path = socket_path_from_uri(&uri).unwrap(); - assert_eq!(actual_path.as_path(), Path::new(expected_path)); - - let expected_path = "./relative/path/to/a/socket.sock".as_ref(); - let uri = socket_path_to_uri(expected_path).unwrap(); - let actual_path = socket_path_from_uri(&uri).unwrap(); - assert_eq!(actual_path.as_path(), Path::new(expected_path)); - } - - pin_project! { - #[project = ConnStreamProj] - pub enum ConnStream { - Tcp{ #[pin] transport: tokio::net::TcpStream }, - Tls{ #[pin] transport: tokio_rustls::client::TlsStream}, - Udp{ #[pin] transport: tokio::net::UnixStream }, - } - } -} - -use futures::{future, FutureExt}; -use hyper::client::HttpConnector; -use hyper_rustls::MaybeHttpsStream; -use rustls::ClientConfig; - -#[cfg(unix)] -use uds::{ConnStream, ConnStreamProj}; - -#[cfg(not(unix))] -pin_project_lite::pin_project! { - #[project = ConnStreamProj] - pub(crate) enum ConnStream { - Tcp{ #[pin] transport: tokio::net::TcpStream }, - Tls{ #[pin] transport: tokio_rustls::client::TlsStream}, - } -} - -#[derive(Clone)] -pub enum MaybeHttpsConnector { - Http(hyper::client::HttpConnector), - Https(hyper_rustls::HttpsConnector), -} - -impl MaybeHttpsConnector { - pub(crate) fn new() -> Self { - match build_https_connector() { - Ok(connector) => MaybeHttpsConnector::Https(connector), - Err(_) => MaybeHttpsConnector::Http(HttpConnector::new()), - } - } -} - -fn build_https_connector() -> anyhow::Result> { - let certs = load_root_certs()?; - let client_config = ClientConfig::builder() - .with_safe_defaults() - .with_root_certificates(certs) - .with_no_client_auth(); - Ok( - hyper_rustls::HttpsConnectorBuilder::new() - .with_tls_config(client_config) - .https_or_http() - .enable_http1() - .build(), - ) -} - -fn load_root_certs() -> anyhow::Result { - let mut roots = rustls::RootCertStore::empty(); - - for cert in rustls_native_certs::load_native_certs()? { - let cert = rustls::Certificate(cert.0); - - //TODO: log when invalid cert is loaded - roots.add(&cert).ok(); - } - if roots.is_empty() { - return Err(crate::errors::Error::NoValidCertifacteRootsFound.into()); - } - Ok(roots) -} - -impl tokio::io::AsyncRead for ConnStream { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> Poll> { - match self.project() { - ConnStreamProj::Tcp { transport } => transport.poll_read(cx, buf), - ConnStreamProj::Tls { transport } => transport.poll_read(cx, buf), - #[cfg(unix)] - ConnStreamProj::Udp { transport } => transport.poll_read(cx, buf), - } - } -} - -impl hyper::client::connect::Connection for ConnStream { - fn connected(&self) -> hyper::client::connect::Connected { - match self { - Self::Tcp { transport } => transport.connected(), - Self::Tls { transport } => { - let (tcp, _) = transport.get_ref(); - tcp.connected() - } - #[cfg(unix)] - Self::Udp { transport: _ } => hyper::client::connect::Connected::new(), - } - } -} - -impl tokio::io::AsyncWrite for ConnStream { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - match self.project() { - ConnStreamProj::Tcp { transport } => transport.poll_write(cx, buf), - ConnStreamProj::Tls { transport } => transport.poll_write(cx, buf), - #[cfg(unix)] - ConnStreamProj::Udp { transport } => transport.poll_write(cx, buf), - } - } - - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - match self.project() { - ConnStreamProj::Tcp { transport } => transport.poll_shutdown(cx), - ConnStreamProj::Tls { transport } => transport.poll_shutdown(cx), - #[cfg(unix)] - ConnStreamProj::Udp { transport } => transport.poll_shutdown(cx), - } - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.project() { - ConnStreamProj::Tcp { transport } => transport.poll_flush(cx), - ConnStreamProj::Tls { transport } => transport.poll_flush(cx), - #[cfg(unix)] - ConnStreamProj::Udp { transport } => transport.poll_flush(cx), - } - } -} - -impl hyper::service::Service for MaybeHttpsConnector { - type Response = ConnStream; - type Error = Box; - - // This lint gets lifted in this place in a newer version, see: - // https://github.com/rust-lang/rust-clippy/pull/8030 - #[allow(clippy::type_complexity)] - type Future = Pin> + Send>>; - - fn call(&mut self, uri: hyper::Uri) -> Self::Future { - match uri.scheme_str() { - Some("unix") => Box::pin(async move { - #[cfg(unix)] - { - let path = uds::socket_path_from_uri(&uri)?; - Ok(ConnStream::Udp { - transport: tokio::net::UnixStream::connect(path).await?, - }) - } - #[cfg(not(unix))] - { - Err(crate::errors::Error::UnixSockeUnsuported.into()) - } - }), - Some("https") => match self { - Self::Http(_) => future::err::( - crate::errors::Error::CannotEstablishTlsConnection.into(), - ) - .boxed(), - Self::Https(c) => { - let fut = c.call(uri); - Box::pin(async { - match fut.await? { - MaybeHttpsStream::Http(_) => { - Err(crate::errors::Error::CannotEstablishTlsConnection.into()) - } - MaybeHttpsStream::Https(t) => Ok(ConnStream::Tls { transport: t }), - } - }) - } - }, - _ => match self { - Self::Http(c) => { - let fut = c.call(uri); - Box::pin(async { - Ok(ConnStream::Tcp { - transport: fut.await?, - }) - }) - } - Self::Https(c) => { - let fut = c.call(uri); - Box::pin(async { - match fut.await? { - MaybeHttpsStream::Http(t) => Ok(ConnStream::Tcp { transport: t }), - MaybeHttpsStream::Https(t) => Ok(ConnStream::Tls { transport: t }), - } - }) - } - }, - } - } - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - match self { - MaybeHttpsConnector::Http(c) => c.poll_ready(cx).map_err(|e| e.into()), - MaybeHttpsConnector::Https(c) => c.poll_ready(cx), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - /// Verify that the Connector type implements the correct bound Connect + Clone - /// to be able to use the hyper::Client - fn test_hyper_client_from_connector() { - let _: hyper::Client = - hyper::Client::builder().build(MaybeHttpsConnector::new()); - } -} diff --git a/ddprof-exporter/src/connector/conn_stream.rs b/ddprof-exporter/src/connector/conn_stream.rs new file mode 100644 index 0000000..dbde0c3 --- /dev/null +++ b/ddprof-exporter/src/connector/conn_stream.rs @@ -0,0 +1,135 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use futures::{future, Future, FutureExt, TryFutureExt}; +use hyper_rustls::HttpsConnector; +use pin_project_lite::pin_project; + +pin_project! { + #[derive(Debug)] + #[project = ConnStreamProj] + pub enum ConnStream { + Tcp{ #[pin] transport: tokio::net::TcpStream }, + Tls{ #[pin] transport: tokio_rustls::client::TlsStream}, + #[cfg(unix)] + Udp{ #[pin] transport: tokio::net::UnixStream }, + } +} + +pub type ConnStreamError = Box; + +use hyper::{client::HttpConnector, service::Service}; +impl ConnStream { + pub async fn from_uds_uri(uri: hyper::Uri) -> Result { + #[cfg(unix)] + { + let path = super::uds::socket_path_from_uri(&uri)?; + Ok(ConnStream::Udp { + transport: tokio::net::UnixStream::connect(path).await?, + }) + } + #[cfg(not(unix))] + { + Err(crate::errors::Error::UnixSockeUnsuported.into()) + } + } + + pub fn from_http_connector_with_uri( + c: &mut HttpConnector, + uri: hyper::Uri, + ) -> impl Future> { + c.call(uri).map(|r| match r { + Ok(t) => Ok(ConnStream::Tcp { transport: t }), + Err(e) => Err(e.into()), + }) + } + + pub fn from_https_connector_with_uri( + c: &mut HttpsConnector, + uri: hyper::Uri, + require_tls: bool, + ) -> impl Future> { + c.call(uri).and_then(move |stream| match stream { + // move only require_tls + hyper_rustls::MaybeHttpsStream::Http(t) => { + if require_tls { + future::ready(Err( + crate::errors::Error::CannotEstablishTlsConnection.into() + )) + } else { + future::ready(Ok(ConnStream::Tcp { transport: t })) + } + } + hyper_rustls::MaybeHttpsStream::Https(t) => { + future::ready(Ok(ConnStream::Tls { transport: t })) + } + }) + } +} + +impl tokio::io::AsyncRead for ConnStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + match self.project() { + ConnStreamProj::Tcp { transport } => transport.poll_read(cx, buf), + ConnStreamProj::Tls { transport } => transport.poll_read(cx, buf), + #[cfg(unix)] + ConnStreamProj::Udp { transport } => transport.poll_read(cx, buf), + } + } +} + +impl hyper::client::connect::Connection for ConnStream { + fn connected(&self) -> hyper::client::connect::Connected { + match self { + Self::Tcp { transport } => transport.connected(), + Self::Tls { transport } => { + let (tcp, _) = transport.get_ref(); + tcp.connected() + } + #[cfg(unix)] + Self::Udp { transport: _ } => hyper::client::connect::Connected::new(), + } + } +} + +impl tokio::io::AsyncWrite for ConnStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + match self.project() { + ConnStreamProj::Tcp { transport } => transport.poll_write(cx, buf), + ConnStreamProj::Tls { transport } => transport.poll_write(cx, buf), + #[cfg(unix)] + ConnStreamProj::Udp { transport } => transport.poll_write(cx, buf), + } + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match self.project() { + ConnStreamProj::Tcp { transport } => transport.poll_shutdown(cx), + ConnStreamProj::Tls { transport } => transport.poll_shutdown(cx), + #[cfg(unix)] + ConnStreamProj::Udp { transport } => transport.poll_shutdown(cx), + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.project() { + ConnStreamProj::Tcp { transport } => transport.poll_flush(cx), + ConnStreamProj::Tls { transport } => transport.poll_flush(cx), + #[cfg(unix)] + ConnStreamProj::Udp { transport } => transport.poll_flush(cx), + } + } +} diff --git a/ddprof-exporter/src/connector/mod.rs b/ddprof-exporter/src/connector/mod.rs new file mode 100644 index 0000000..fa89228 --- /dev/null +++ b/ddprof-exporter/src/connector/mod.rs @@ -0,0 +1,146 @@ +use futures::future::BoxFuture; +use futures::{future, FutureExt}; +use hyper::client::HttpConnector; +use rustls::ClientConfig; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +#[cfg(unix)] +pub mod uds; + +mod conn_stream; +use conn_stream::{ConnStream, ConnStreamError}; + +#[derive(Clone)] +pub enum MaybeHttpsConnector { + Http(hyper::client::HttpConnector), + Https(hyper_rustls::HttpsConnector), +} + +impl MaybeHttpsConnector { + pub(crate) fn new() -> Self { + match build_https_connector() { + Ok(connector) => MaybeHttpsConnector::Https(connector), + Err(_) => MaybeHttpsConnector::Http(HttpConnector::new()), + } + } + + fn build_conn_stream<'a>( + &mut self, + uri: hyper::Uri, + require_tls: bool, + ) -> BoxFuture<'a, Result> { + match self { + Self::Http(c) => { + if require_tls { + future::err::( + crate::errors::Error::CannotEstablishTlsConnection.into(), + ) + .boxed() + } else { + ConnStream::from_http_connector_with_uri(c, uri).boxed() + } + } + Self::Https(c) => { + ConnStream::from_https_connector_with_uri(c, uri, require_tls).boxed() + } + } + } +} + +fn build_https_connector( +) -> anyhow::Result> { + let certs = load_root_certs()?; + let client_config = ClientConfig::builder() + .with_safe_defaults() + .with_root_certificates(certs) + .with_no_client_auth(); + Ok(hyper_rustls::HttpsConnectorBuilder::new() + .with_tls_config(client_config) + .https_or_http() + .enable_http1() + .build()) +} + +fn load_root_certs() -> anyhow::Result { + let mut roots = rustls::RootCertStore::empty(); + + for cert in rustls_native_certs::load_native_certs()? { + let cert = rustls::Certificate(cert.0); + + //TODO: log when invalid cert is loaded + roots.add(&cert).ok(); + } + if roots.is_empty() { + return Err(crate::errors::Error::NoValidCertifacteRootsFound.into()); + } + Ok(roots) +} + +impl hyper::service::Service for MaybeHttpsConnector { + type Response = ConnStream; + type Error = ConnStreamError; + + // This lint gets lifted in this place in a newer version, see: + // https://github.com/rust-lang/rust-clippy/pull/8030 + #[allow(clippy::type_complexity)] + type Future = Pin> + Send>>; + + fn call(&mut self, uri: hyper::Uri) -> Self::Future { + match uri.scheme_str() { + Some("unix") => conn_stream::ConnStream::from_uds_uri(uri).boxed(), + Some("https") => self.build_conn_stream(uri, true), + _ => self.build_conn_stream(uri, false), + } + } + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + match self { + MaybeHttpsConnector::Http(c) => c.poll_ready(cx).map_err(|e| e.into()), + MaybeHttpsConnector::Https(c) => c.poll_ready(cx), + } + } +} + +#[cfg(test)] +mod tests { + use std::env; + use tokio; + + use hyper::service::Service; + + use super::*; + + #[test] + /// Verify that the MaybeHttpsConnector type implements the correct bound Connect + Clone + /// to be able to use the hyper::Client + fn test_hyper_client_from_connector() { + let _: hyper::Client = + hyper::Client::builder().build(MaybeHttpsConnector::new()); + } + + #[tokio::test] + /// Verify that MaybeHttpsConnector will only allow non tls connections if root certificates + /// are not found + async fn test_missing_root_certificates_only_allow_http_connections() { + const ENV_SSL_CERT_FILE: &str = "SSL_CERT_FILE"; + let old_value = env::var(ENV_SSL_CERT_FILE).unwrap_or_default(); + + env::set_var(ENV_SSL_CERT_FILE, "this/folder/does/not/exist"); + let mut connector = MaybeHttpsConnector::new(); + assert!(matches!(connector, MaybeHttpsConnector::Http(_))); + + let stream = connector + .call(hyper::Uri::from_static("https://example.com")) + .await + .unwrap_err(); + + assert_eq!( + *stream.downcast::().unwrap(), + crate::errors::Error::CannotEstablishTlsConnection + ); + + env::set_var(ENV_SSL_CERT_FILE, old_value); + } +} diff --git a/ddprof-exporter/src/connector/uds.rs b/ddprof-exporter/src/connector/uds.rs new file mode 100644 index 0000000..6ab737c --- /dev/null +++ b/ddprof-exporter/src/connector/uds.rs @@ -0,0 +1,52 @@ +// Tokio doesn't handle unix sockets on windows +use std::error::Error; +use std::ffi::OsString; +use std::os::unix::ffi::{OsStrExt, OsStringExt}; +use std::path::{Path, PathBuf}; + +/// Creates a new Uri, with the `unix` scheme, and the path to the socket +/// encoded as a hex string, to prevent special characters in the url authority +pub fn socket_path_to_uri(path: &Path) -> Result> { + let path = hex::encode(path.as_os_str().as_bytes()); + Ok(hyper::Uri::builder() + .scheme("unix") + .authority(path) + .path_and_query("") + .build()?) +} + +pub fn socket_path_from_uri(uri: &hyper::Uri) -> anyhow::Result { + if uri.scheme_str() != Some("unix") { + return Err(crate::errors::Error::InvalidUrl.into()); + } + let path = hex::decode( + uri.authority() + .ok_or(crate::errors::Error::InvalidUrl)? + .as_str(), + ) + .map_err(|_| crate::errors::Error::InvalidUrl)?; + Ok(PathBuf::from(OsString::from_vec(path))) +} + +#[test] +fn test_encode_unix_socket_path_absolute() { + let expected_path = "/path/to/a/socket.sock".as_ref(); + let uri = socket_path_to_uri(expected_path).unwrap(); + assert_eq!(uri.scheme_str(), Some("unix")); + + let actual_path = socket_path_from_uri(&uri).unwrap(); + assert_eq!(actual_path.as_path(), Path::new(expected_path)) +} + +#[test] +fn test_encode_unix_socket_relative_path() { + let expected_path = "relative/path/to/a/socket.sock".as_ref(); + let uri = socket_path_to_uri(expected_path).unwrap(); + let actual_path = socket_path_from_uri(&uri).unwrap(); + assert_eq!(actual_path.as_path(), Path::new(expected_path)); + + let expected_path = "./relative/path/to/a/socket.sock".as_ref(); + let uri = socket_path_to_uri(expected_path).unwrap(); + let actual_path = socket_path_from_uri(&uri).unwrap(); + assert_eq!(actual_path.as_path(), Path::new(expected_path)); +} diff --git a/ddprof-exporter/src/errors.rs b/ddprof-exporter/src/errors.rs index 16f43fc..68fae81 100644 --- a/ddprof-exporter/src/errors.rs +++ b/ddprof-exporter/src/errors.rs @@ -1,7 +1,7 @@ use std::error; use std::fmt; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] #[allow(dead_code)] pub(crate) enum Error { InvalidUrl, @@ -17,8 +17,12 @@ impl fmt::Display for Error { Self::InvalidUrl => "invalid url", Self::OperationTimedOut => "operation timed out", Self::UnixSockeUnsuported => "unix sockets unsuported on windows", - Self::CannotEstablishTlsConnection => "cannot establish requested secure TLS connection", - Self::NoValidCertifacteRootsFound => "native tls couldn't find any valid certifacte roots" + Self::CannotEstablishTlsConnection => { + "cannot establish requested secure TLS connection" + } + Self::NoValidCertifacteRootsFound => { + "native tls couldn't find any valid certifacte roots" + } }) } } From 48ee3f62d442978eac57f288a8eb2d6237c05af3 Mon Sep 17 00:00:00 2001 From: Pawel Chojnacki Date: Tue, 12 Apr 2022 10:24:04 +0200 Subject: [PATCH 07/13] Fix linting errors --- ddprof-exporter/src/connector/conn_stream.rs | 2 +- ddprof-exporter/src/connector/mod.rs | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/ddprof-exporter/src/connector/conn_stream.rs b/ddprof-exporter/src/connector/conn_stream.rs index 9bb68e9..cd30a4b 100644 --- a/ddprof-exporter/src/connector/conn_stream.rs +++ b/ddprof-exporter/src/connector/conn_stream.rs @@ -17,7 +17,7 @@ pin_project! { Tcp{ #[pin] transport: tokio::net::TcpStream }, Tls{ #[pin] transport: tokio_rustls::client::TlsStream}, // Tokio doesn't handle unix sockets on windows - #[cfg(unix)] + #[cfg(unix)] Udp{ #[pin] transport: tokio::net::UnixStream }, } } diff --git a/ddprof-exporter/src/connector/mod.rs b/ddprof-exporter/src/connector/mod.rs index bdbfc60..73c3459 100644 --- a/ddprof-exporter/src/connector/mod.rs +++ b/ddprof-exporter/src/connector/mod.rs @@ -108,10 +108,8 @@ impl hyper::service::Service for MaybeHttpsConnector { #[cfg(test)] mod tests { - use std::env; - use tokio; - use hyper::service::Service; + use std::env; use super::*; From 1dbcca846e104e325098d8de2b07882a78bada5c Mon Sep 17 00:00:00 2001 From: Pawel Chojnacki Date: Tue, 12 Apr 2022 18:29:17 +0200 Subject: [PATCH 08/13] enable matrix build and test on all expected platforms --- .github/workflows/test.yml | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e4dda76..f58698a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -5,8 +5,11 @@ env: jobs: test: - name: cargo test --workspace - runs-on: ubuntu-latest + name: "cargo test --workspace #${{ matrix.platform }}" + runs-on: ${{ matrix.platform }} + strategy: + matrix: + platform: [windows-latest, ubuntu-latest, macos-latest] steps: - name: Checkout sources uses: actions/checkout@v2 From d5676b9cb78bd8746581477b610c6b587557e063 Mon Sep 17 00:00:00 2001 From: Pawel Chojnacki Date: Tue, 12 Apr 2022 18:30:55 +0200 Subject: [PATCH 09/13] Rename MaybeHttpsConnector to Connector --- ddprof-exporter/src/connector/mod.rs | 26 +++++++++++++------------- ddprof-exporter/src/lib.rs | 4 ++-- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/ddprof-exporter/src/connector/mod.rs b/ddprof-exporter/src/connector/mod.rs index 73c3459..f55116b 100644 --- a/ddprof-exporter/src/connector/mod.rs +++ b/ddprof-exporter/src/connector/mod.rs @@ -16,16 +16,16 @@ mod conn_stream; use conn_stream::{ConnStream, ConnStreamError}; #[derive(Clone)] -pub enum MaybeHttpsConnector { +pub enum Connector { Http(hyper::client::HttpConnector), Https(hyper_rustls::HttpsConnector), } -impl MaybeHttpsConnector { +impl Connector { pub(crate) fn new() -> Self { match build_https_connector() { - Ok(connector) => MaybeHttpsConnector::Https(connector), - Err(_) => MaybeHttpsConnector::Http(HttpConnector::new()), + Ok(connector) => Connector::Https(connector), + Err(_) => Connector::Http(HttpConnector::new()), } } @@ -81,7 +81,7 @@ fn load_root_certs() -> anyhow::Result { Ok(roots) } -impl hyper::service::Service for MaybeHttpsConnector { +impl hyper::service::Service for Connector { type Response = ConnStream; type Error = ConnStreamError; @@ -100,8 +100,8 @@ impl hyper::service::Service for MaybeHttpsConnector { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { match self { - MaybeHttpsConnector::Http(c) => c.poll_ready(cx).map_err(|e| e.into()), - MaybeHttpsConnector::Https(c) => c.poll_ready(cx), + Connector::Http(c) => c.poll_ready(cx).map_err(|e| e.into()), + Connector::Https(c) => c.poll_ready(cx), } } } @@ -114,23 +114,23 @@ mod tests { use super::*; #[test] - /// Verify that the MaybeHttpsConnector type implements the correct bound Connect + Clone + /// Verify that the Connector type implements the correct bound Connect + Clone /// to be able to use the hyper::Client fn test_hyper_client_from_connector() { - let _: hyper::Client = - hyper::Client::builder().build(MaybeHttpsConnector::new()); + let _: hyper::Client = + hyper::Client::builder().build(Connector::new()); } #[tokio::test] - /// Verify that MaybeHttpsConnector will only allow non tls connections if root certificates + /// Verify that Connector will only allow non tls connections if root certificates /// are not found async fn test_missing_root_certificates_only_allow_http_connections() { const ENV_SSL_CERT_FILE: &str = "SSL_CERT_FILE"; let old_value = env::var(ENV_SSL_CERT_FILE).unwrap_or_default(); env::set_var(ENV_SSL_CERT_FILE, "this/folder/does/not/exist"); - let mut connector = MaybeHttpsConnector::new(); - assert!(matches!(connector, MaybeHttpsConnector::Http(_))); + let mut connector = Connector::new(); + assert!(matches!(connector, Connector::Http(_))); let stream = connector .call(hyper::Uri::from_static("https://example.com")) diff --git a/ddprof-exporter/src/lib.rs b/ddprof-exporter/src/lib.rs index 87ded6c..870ac23 100644 --- a/ddprof-exporter/src/lib.rs +++ b/ddprof-exporter/src/lib.rs @@ -23,7 +23,7 @@ pub use connector::uds::socket_path_to_uri; const DURATION_ZERO: std::time::Duration = std::time::Duration::from_millis(0); const DATADOG_CONTAINER_ID_HEADER: &str = "Datadog-Container-ID"; -type HttpClient = hyper::Client; +type HttpClient = hyper::Client; pub struct Exporter { client: HttpClient, @@ -227,7 +227,7 @@ impl Exporter { // Set idle to 0, which prevents the pipe being broken every 2nd request let client = hyper::Client::builder() .pool_max_idle_per_host(0) - .build(connector::MaybeHttpsConnector::new()); + .build(connector::Connector::new()); let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build()?; From b37b09326fd5a83774adcbe36deb7c076e4672c0 Mon Sep 17 00:00:00 2001 From: Pawel Chojnacki Date: Tue, 12 Apr 2022 18:35:46 +0200 Subject: [PATCH 10/13] Fix compilation on Windows --- Cargo.lock | 22 +++++++++++++++++++- ddprof-exporter/Cargo.toml | 2 +- ddprof-exporter/src/connector/conn_stream.rs | 2 +- ddprof-exporter/src/connector/mod.rs | 3 +-- 4 files changed, 24 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 88af977..decd95e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -133,7 +133,7 @@ dependencies = [ "maplit", "mime_guess", "percent-encoding", - "pin-project-lite", + "pin-project", "regex", "rustls", "rustls-native-certs", @@ -572,6 +572,26 @@ dependencies = [ "indexmap", ] +[[package]] +name = "pin-project" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58ad3879ad3baf4e44784bc6a718a8698867bb991f8ce24d1bcbe2cfb4c3a75e" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "744b6f092ba29c3650faf274db506afd39944f48420f6c86b17cfe0ee1cb36bb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.8" diff --git a/ddprof-exporter/Cargo.toml b/ddprof-exporter/Cargo.toml index 92c3977..eab130e 100644 --- a/ddprof-exporter/Cargo.toml +++ b/ddprof-exporter/Cargo.toml @@ -27,7 +27,7 @@ futures-core = { version = "0.3.0", default-features = false } futures-util = { version = "0.3.0", default-features = false } mime_guess = { version = "2.0", default-features = false } http-body = "0.4" -pin-project-lite = "0.2.0" +pin-project = "1.0" rustls = { version = "0.20.4", default-features = false } rustls-native-certs = { version = "0.6" } hyper-rustls = { version = "0.23", default-features = false, features = ["native-tokio", "http1", "tls12"] } diff --git a/ddprof-exporter/src/connector/conn_stream.rs b/ddprof-exporter/src/connector/conn_stream.rs index cd30a4b..579f010 100644 --- a/ddprof-exporter/src/connector/conn_stream.rs +++ b/ddprof-exporter/src/connector/conn_stream.rs @@ -8,7 +8,7 @@ use std::{ use futures::{future, Future, FutureExt, TryFutureExt}; use hyper_rustls::HttpsConnector; -use pin_project_lite::pin_project; +use pin_project::pin_project; pin_project! { #[derive(Debug)] diff --git a/ddprof-exporter/src/connector/mod.rs b/ddprof-exporter/src/connector/mod.rs index f55116b..b104e1d 100644 --- a/ddprof-exporter/src/connector/mod.rs +++ b/ddprof-exporter/src/connector/mod.rs @@ -117,8 +117,7 @@ mod tests { /// Verify that the Connector type implements the correct bound Connect + Clone /// to be able to use the hyper::Client fn test_hyper_client_from_connector() { - let _: hyper::Client = - hyper::Client::builder().build(Connector::new()); + let _: hyper::Client = hyper::Client::builder().build(Connector::new()); } #[tokio::test] From 6924d1c7eade73b6d833cf6353725623f023677f Mon Sep 17 00:00:00 2001 From: Pawel Chojnacki Date: Tue, 12 Apr 2022 18:45:16 +0200 Subject: [PATCH 11/13] Make pin_project_lite usage compile on windows --- .github/actions/cache/action.yaml | 4 +-- Cargo.lock | 22 +------------- ddprof-exporter/Cargo.toml | 2 +- ddprof-exporter/src/connector/conn_stream.rs | 30 ++++++++++++++++---- 4 files changed, 29 insertions(+), 29 deletions(-) diff --git a/.github/actions/cache/action.yaml b/.github/actions/cache/action.yaml index b5ae639..625d672 100644 --- a/.github/actions/cache/action.yaml +++ b/.github/actions/cache/action.yaml @@ -1,5 +1,5 @@ -name: '[rust] Checkout and cache' -description: '[rust] Checkout cache' +name: '[rust] Cache' +description: '[rust] Cache' runs: using: composite diff --git a/Cargo.lock b/Cargo.lock index decd95e..88af977 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -133,7 +133,7 @@ dependencies = [ "maplit", "mime_guess", "percent-encoding", - "pin-project", + "pin-project-lite", "regex", "rustls", "rustls-native-certs", @@ -572,26 +572,6 @@ dependencies = [ "indexmap", ] -[[package]] -name = "pin-project" -version = "1.0.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58ad3879ad3baf4e44784bc6a718a8698867bb991f8ce24d1bcbe2cfb4c3a75e" -dependencies = [ - "pin-project-internal", -] - -[[package]] -name = "pin-project-internal" -version = "1.0.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "744b6f092ba29c3650faf274db506afd39944f48420f6c86b17cfe0ee1cb36bb" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "pin-project-lite" version = "0.2.8" diff --git a/ddprof-exporter/Cargo.toml b/ddprof-exporter/Cargo.toml index eab130e..b915dd1 100644 --- a/ddprof-exporter/Cargo.toml +++ b/ddprof-exporter/Cargo.toml @@ -27,7 +27,7 @@ futures-core = { version = "0.3.0", default-features = false } futures-util = { version = "0.3.0", default-features = false } mime_guess = { version = "2.0", default-features = false } http-body = "0.4" -pin-project = "1.0" +pin-project-lite = "0.2.8" rustls = { version = "0.20.4", default-features = false } rustls-native-certs = { version = "0.6" } hyper-rustls = { version = "0.23", default-features = false, features = ["native-tokio", "http1", "tls12"] } diff --git a/ddprof-exporter/src/connector/conn_stream.rs b/ddprof-exporter/src/connector/conn_stream.rs index 579f010..ba6e8f4 100644 --- a/ddprof-exporter/src/connector/conn_stream.rs +++ b/ddprof-exporter/src/connector/conn_stream.rs @@ -8,17 +8,37 @@ use std::{ use futures::{future, Future, FutureExt, TryFutureExt}; use hyper_rustls::HttpsConnector; -use pin_project::pin_project; +use pin_project_lite::pin_project; +#[cfg(unix)] pin_project! { #[derive(Debug)] #[project = ConnStreamProj] pub enum ConnStream { - Tcp{ #[pin] transport: tokio::net::TcpStream }, - Tls{ #[pin] transport: tokio_rustls::client::TlsStream}, + Tcp { + #[pin] transport: tokio::net::TcpStream, + }, + Tls { + #[pin] transport: tokio_rustls::client::TlsStream, + }, // Tokio doesn't handle unix sockets on windows - #[cfg(unix)] - Udp{ #[pin] transport: tokio::net::UnixStream }, + Udp { + #[pin] transport: tokio::net::UnixStream, + }, + } +} + +#[cfg(not(unix))] +pin_project! { + #[derive(Debug)] + #[project = ConnStreamProj] + pub enum ConnStream { + Tcp { + #[pin] transport: tokio::net::TcpStream, + }, + Tls { + #[pin] transport: tokio_rustls::client::TlsStream, + }, } } From 1b1983c13e0af016c4dfd49ba49b776d00a756a9 Mon Sep 17 00:00:00 2001 From: Pawel Chojnacki Date: Wed, 13 Apr 2022 09:40:19 +0200 Subject: [PATCH 12/13] Impl own pin project --- ddprof-exporter/src/connector/conn_stream.rs | 78 ++++++++++++-------- 1 file changed, 47 insertions(+), 31 deletions(-) diff --git a/ddprof-exporter/src/connector/conn_stream.rs b/ddprof-exporter/src/connector/conn_stream.rs index ba6e8f4..5e25669 100644 --- a/ddprof-exporter/src/connector/conn_stream.rs +++ b/ddprof-exporter/src/connector/conn_stream.rs @@ -8,37 +8,53 @@ use std::{ use futures::{future, Future, FutureExt, TryFutureExt}; use hyper_rustls::HttpsConnector; -use pin_project_lite::pin_project; -#[cfg(unix)] -pin_project! { - #[derive(Debug)] - #[project = ConnStreamProj] - pub enum ConnStream { - Tcp { - #[pin] transport: tokio::net::TcpStream, - }, - Tls { - #[pin] transport: tokio_rustls::client::TlsStream, - }, - // Tokio doesn't handle unix sockets on windows - Udp { - #[pin] transport: tokio::net::UnixStream, - }, - } +#[derive(Debug)] +pub enum ConnStream { + Tcp { + transport: tokio::net::TcpStream, + }, + Tls { + transport: Box>, + }, + #[cfg(unix)] + Udp { + transport: tokio::net::UnixStream, + }, +} + +pub enum ConnStreamProj<'pin> +where + ConnStream: 'pin, +{ + Tcp { + transport: Pin<&'pin mut tokio::net::TcpStream>, + }, + Tls { + transport: Pin<&'pin mut tokio_rustls::client::TlsStream>, + }, + #[cfg(unix)] + Udp { + transport: Pin<&'pin mut tokio::net::UnixStream>, + }, } -#[cfg(not(unix))] -pin_project! { - #[derive(Debug)] - #[project = ConnStreamProj] - pub enum ConnStream { - Tcp { - #[pin] transport: tokio::net::TcpStream, - }, - Tls { - #[pin] transport: tokio_rustls::client::TlsStream, - }, +impl ConnStream { + pub(crate) fn project<'__pin>(self: Pin<&'__pin mut Self>) -> ConnStreamProj<'__pin> { + unsafe { + match self.get_unchecked_mut() { + Self::Tcp { transport } => ConnStreamProj::Tcp { + transport: Pin::new_unchecked(transport), + }, + Self::Tls { transport } => ConnStreamProj::Tls { + transport: Pin::new_unchecked(transport), + }, + #[cfg(unix)] + Self::Udp { transport } => ConnStreamProj::Udp { + transport: Pin::new_unchecked(transport), + }, + } + } } } @@ -86,9 +102,9 @@ impl ConnStream { future::ready(Ok(ConnStream::Tcp { transport: t })) } } - hyper_rustls::MaybeHttpsStream::Https(t) => { - future::ready(Ok(ConnStream::Tls { transport: t })) - } + hyper_rustls::MaybeHttpsStream::Https(t) => future::ready(Ok(ConnStream::Tls { + transport: Box::from(t), + })), }) } } From dd6980e5ece1aa5a521226e2860c11c64a46f50d Mon Sep 17 00:00:00 2001 From: Pawel Chojnacki Date: Wed, 13 Apr 2022 09:53:05 +0200 Subject: [PATCH 13/13] Settle on using pin_project --- Cargo.lock | 22 ++++++++++- ddprof-exporter/Cargo.toml | 2 +- ddprof-exporter/src/connector/conn_stream.rs | 40 +++----------------- 3 files changed, 27 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 88af977..decd95e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -133,7 +133,7 @@ dependencies = [ "maplit", "mime_guess", "percent-encoding", - "pin-project-lite", + "pin-project", "regex", "rustls", "rustls-native-certs", @@ -572,6 +572,26 @@ dependencies = [ "indexmap", ] +[[package]] +name = "pin-project" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58ad3879ad3baf4e44784bc6a718a8698867bb991f8ce24d1bcbe2cfb4c3a75e" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "744b6f092ba29c3650faf274db506afd39944f48420f6c86b17cfe0ee1cb36bb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.8" diff --git a/ddprof-exporter/Cargo.toml b/ddprof-exporter/Cargo.toml index b915dd1..1f50d24 100644 --- a/ddprof-exporter/Cargo.toml +++ b/ddprof-exporter/Cargo.toml @@ -27,7 +27,7 @@ futures-core = { version = "0.3.0", default-features = false } futures-util = { version = "0.3.0", default-features = false } mime_guess = { version = "2.0", default-features = false } http-body = "0.4" -pin-project-lite = "0.2.8" +pin-project = "1" rustls = { version = "0.20.4", default-features = false } rustls-native-certs = { version = "0.6" } hyper-rustls = { version = "0.23", default-features = false, features = ["native-tokio", "http1", "tls12"] } diff --git a/ddprof-exporter/src/connector/conn_stream.rs b/ddprof-exporter/src/connector/conn_stream.rs index 5e25669..c718806 100644 --- a/ddprof-exporter/src/connector/conn_stream.rs +++ b/ddprof-exporter/src/connector/conn_stream.rs @@ -8,56 +8,26 @@ use std::{ use futures::{future, Future, FutureExt, TryFutureExt}; use hyper_rustls::HttpsConnector; +use pin_project::pin_project; #[derive(Debug)] +#[pin_project(project=ConnStreamProj)] pub enum ConnStream { Tcp { + #[pin] transport: tokio::net::TcpStream, }, Tls { + #[pin] transport: Box>, }, #[cfg(unix)] Udp { + #[pin] transport: tokio::net::UnixStream, }, } -pub enum ConnStreamProj<'pin> -where - ConnStream: 'pin, -{ - Tcp { - transport: Pin<&'pin mut tokio::net::TcpStream>, - }, - Tls { - transport: Pin<&'pin mut tokio_rustls::client::TlsStream>, - }, - #[cfg(unix)] - Udp { - transport: Pin<&'pin mut tokio::net::UnixStream>, - }, -} - -impl ConnStream { - pub(crate) fn project<'__pin>(self: Pin<&'__pin mut Self>) -> ConnStreamProj<'__pin> { - unsafe { - match self.get_unchecked_mut() { - Self::Tcp { transport } => ConnStreamProj::Tcp { - transport: Pin::new_unchecked(transport), - }, - Self::Tls { transport } => ConnStreamProj::Tls { - transport: Pin::new_unchecked(transport), - }, - #[cfg(unix)] - Self::Udp { transport } => ConnStreamProj::Udp { - transport: Pin::new_unchecked(transport), - }, - } - } - } -} - pub type ConnStreamError = Box; use hyper::{client::HttpConnector, service::Service};