Skip to content
This repository has been archived by the owner on Jun 28, 2022. It is now read-only.

Merge fixes for hyper certs #48

Merged
merged 1 commit into from
Apr 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 43 additions & 40 deletions ddprof-exporter/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,16 @@ pub(crate) mod uds {

pin_project! {
#[project = ConnStreamProj]
pub(crate) enum ConnStream {
pub enum ConnStream {
Tcp{ #[pin] transport: tokio::net::TcpStream },
Tls{ #[pin] transport: tokio_rustls::client::TlsStream<tokio::net::TcpStream>},
Udp{ #[pin] transport: tokio::net::UnixStream },
}
}
}

use futures::{TryFutureExt, FutureExt};
use futures::future::MaybeDone;
use futures::{FutureExt, TryFutureExt};
use hyper::client::HttpConnector;
use hyper::service::Service;
use hyper_rustls::MaybeHttpsStream;
Expand All @@ -85,7 +85,8 @@ use uds::{ConnStream, ConnStreamProj};
pin_project_lite::pin_project! {
#[project = ConnStreamProj]
pub(crate) enum ConnStream {
Tcp{ #[pin] transport: hyper_rustls::MaybeHttpsStream<tokio::net::TcpStream> },
Tcp{ #[pin] transport: tokio::net::TcpStream },
Tls{ #[pin] transport: tokio_rustls::client::TlsStream<tokio::net::TcpStream>},
}
}

Expand All @@ -106,31 +107,29 @@ impl MaybeHttpsConnector {

fn build_https_connector() -> Option<hyper_rustls::HttpsConnector<hyper::client::HttpConnector>> {
let certs = load_root_certs()?;
let client_config =
ClientConfig::builder()
let client_config = ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(certs)
.with_no_client_auth();
Some(hyper_rustls::HttpsConnectorBuilder::new()
.with_tls_config(client_config)
.https_or_http()
.enable_http1()
.build())
Some(
hyper_rustls::HttpsConnectorBuilder::new()
.with_tls_config(client_config)
.https_or_http()
.enable_http1()
.build(),
)
}

fn load_root_certs() -> Option<rustls::RootCertStore> {
let mut roots = rustls::RootCertStore::empty();
let mut valid_count = 0;
let mut invalid_count = 0;

for cert in rustls_native_certs::load_native_certs().ok()?
{
for cert in rustls_native_certs::load_native_certs().ok()? {
let cert = rustls::Certificate(cert.0);
match roots.add(&cert) {
Ok(_) => valid_count += 1,
Err(err) => {
invalid_count += 1
}
Err(err) => invalid_count += 1,
}
}
if roots.is_empty() {
Expand Down Expand Up @@ -160,7 +159,8 @@ impl hyper::client::connect::Connection for ConnStream {
Self::Tcp { transport } => transport.connected(),
Self::Tls { transport } => {
let (tcp, tls) = transport.get_ref();
if tls.alpn_protocol() == Some(b"h2") { // TODO/QUESTION: is it safe, future proof, to implement this ourselves ?
if tls.alpn_protocol() == Some(b"h2") {
// TODO/QUESTION: is it safe, future proof, to implement this ourselves ?
tcp.connected().negotiated_h2()
} else {
tcp.connected()
Expand Down Expand Up @@ -233,40 +233,42 @@ impl hyper::service::Service<hyper::Uri> for MaybeHttpsConnector {
}
}),
Some("https") => match self {
Self::Http(_) => todo!(),// TODO: return error
Self::Http(_) => todo!(), // TODO: return error
Self::Https(c) => {
let fut = c.call(uri);
Box::pin(async {
match c.call(uri).await? {
match fut.await? {
MaybeHttpsStream::Http(_) => todo!(),
MaybeHttpsStream::Https(t) => {
Ok(ConnStream::Tls {
transport: t
})
},
MaybeHttpsStream::Https(t) => Ok(ConnStream::Tls { transport: t }),
}
})
},
}
},
_ => match self {
Self::Http(c) => {
let fut = c.call(uri);
Box::pin(async {
Ok(ConnStream::Tcp {
transport: fut.await?,
})
})
}
Self::Https(c) => {
let fut = c.call(uri);
Box::pin(async {
match fut.await? {
MaybeHttpsStream::Http(t) => Ok(ConnStream::Tcp { transport: t }),
MaybeHttpsStream::Https(t) => Ok(ConnStream::Tls { transport: t }),
}
})
}
},
_ => {
let fut = async {
let stream = match self {
MaybeHttpsConnector::Http(c) => ConnStream::Tcp { transport: c.call(uri).await? },
MaybeHttpsConnector::Https(c) => match c.call(uri).await? {
MaybeHttpsStream::Http(t) => ConnStream::Tcp { transport: t},
MaybeHttpsStream::Https(t) => ConnStream::Tls{ transport: t},
},
};

Ok(stream)
};
Box::pin(fut)
}
}
}

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self {
MaybeHttpsConnector::Http(c) => c.poll_ready(cx).map_err(|e| e.into() ),
MaybeHttpsConnector::Http(c) => c.poll_ready(cx).map_err(|e| e.into()),
MaybeHttpsConnector::Https(c) => c.poll_ready(cx),
}
}
Expand All @@ -280,6 +282,7 @@ mod tests {
/// Verify that the Connector type implements the correct bound Connect + Clone
/// to be able to use the hyper::Client
fn test_hyper_client_from_connector() {
let _: hyper::Client<MaybeHttpsConnector> = hyper::Client::builder().build(MaybeHttpsConnector::new());
let _: hyper::Client<MaybeHttpsConnector> =
hyper::Client::builder().build(MaybeHttpsConnector::new());
}
}
4 changes: 2 additions & 2 deletions ddprof-exporter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub use connector::uds::socket_path_to_uri;
const DURATION_ZERO: std::time::Duration = std::time::Duration::from_millis(0);
const DATADOG_CONTAINER_ID_HEADER: &str = "Datadog-Container-ID";

type HttpClient = hyper::Client<connector::Connector, hyper::Body>;
type HttpClient = hyper::Client<connector::MaybeHttpsConnector, hyper::Body>;

pub struct Exporter {
client: HttpClient,
Expand Down Expand Up @@ -227,7 +227,7 @@ impl Exporter {
// Set idle to 0, which prevents the pipe being broken every 2nd request
let client = hyper::Client::builder()
.pool_max_idle_per_host(0)
.build(connector::Connector::new());
.build(connector::MaybeHttpsConnector::new());
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
Expand Down