Skip to content
This repository has been archived by the owner on Jun 28, 2022. It is now read-only.

Commit

Permalink
Tentative fix (#48)
Browse files Browse the repository at this point in the history
Co-authored-by: Nicolas Savoire <nicolas.savoire@datadoghq.com>
  • Loading branch information
r1viollet and nsavoire authored Apr 11, 2022
1 parent 13b2098 commit f78daf3
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 42 deletions.
83 changes: 43 additions & 40 deletions ddprof-exporter/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,16 @@ pub(crate) mod uds {

pin_project! {
#[project = ConnStreamProj]
pub(crate) enum ConnStream {
pub enum ConnStream {
Tcp{ #[pin] transport: tokio::net::TcpStream },
Tls{ #[pin] transport: tokio_rustls::client::TlsStream<tokio::net::TcpStream>},
Udp{ #[pin] transport: tokio::net::UnixStream },
}
}
}

use futures::{TryFutureExt, FutureExt};
use futures::future::MaybeDone;
use futures::{FutureExt, TryFutureExt};
use hyper::client::HttpConnector;
use hyper::service::Service;
use hyper_rustls::MaybeHttpsStream;
Expand All @@ -85,7 +85,8 @@ use uds::{ConnStream, ConnStreamProj};
pin_project_lite::pin_project! {
#[project = ConnStreamProj]
pub(crate) enum ConnStream {
Tcp{ #[pin] transport: hyper_rustls::MaybeHttpsStream<tokio::net::TcpStream> },
Tcp{ #[pin] transport: tokio::net::TcpStream },
Tls{ #[pin] transport: tokio_rustls::client::TlsStream<tokio::net::TcpStream>},
}
}

Expand All @@ -106,31 +107,29 @@ impl MaybeHttpsConnector {

fn build_https_connector() -> Option<hyper_rustls::HttpsConnector<hyper::client::HttpConnector>> {
let certs = load_root_certs()?;
let client_config =
ClientConfig::builder()
let client_config = ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(certs)
.with_no_client_auth();
Some(hyper_rustls::HttpsConnectorBuilder::new()
.with_tls_config(client_config)
.https_or_http()
.enable_http1()
.build())
Some(
hyper_rustls::HttpsConnectorBuilder::new()
.with_tls_config(client_config)
.https_or_http()
.enable_http1()
.build(),
)
}

fn load_root_certs() -> Option<rustls::RootCertStore> {
let mut roots = rustls::RootCertStore::empty();
let mut valid_count = 0;
let mut invalid_count = 0;

for cert in rustls_native_certs::load_native_certs().ok()?
{
for cert in rustls_native_certs::load_native_certs().ok()? {
let cert = rustls::Certificate(cert.0);
match roots.add(&cert) {
Ok(_) => valid_count += 1,
Err(err) => {
invalid_count += 1
}
Err(err) => invalid_count += 1,
}
}
if roots.is_empty() {
Expand Down Expand Up @@ -160,7 +159,8 @@ impl hyper::client::connect::Connection for ConnStream {
Self::Tcp { transport } => transport.connected(),
Self::Tls { transport } => {
let (tcp, tls) = transport.get_ref();
if tls.alpn_protocol() == Some(b"h2") { // TODO/QUESTION: is it safe, future proof, to implement this ourselves ?
if tls.alpn_protocol() == Some(b"h2") {
// TODO/QUESTION: is it safe, future proof, to implement this ourselves ?
tcp.connected().negotiated_h2()
} else {
tcp.connected()
Expand Down Expand Up @@ -233,40 +233,42 @@ impl hyper::service::Service<hyper::Uri> for MaybeHttpsConnector {
}
}),
Some("https") => match self {
Self::Http(_) => todo!(),// TODO: return error
Self::Http(_) => todo!(), // TODO: return error
Self::Https(c) => {
let fut = c.call(uri);
Box::pin(async {
match c.call(uri).await? {
match fut.await? {
MaybeHttpsStream::Http(_) => todo!(),
MaybeHttpsStream::Https(t) => {
Ok(ConnStream::Tls {
transport: t
})
},
MaybeHttpsStream::Https(t) => Ok(ConnStream::Tls { transport: t }),
}
})
},
}
},
_ => match self {
Self::Http(c) => {
let fut = c.call(uri);
Box::pin(async {
Ok(ConnStream::Tcp {
transport: fut.await?,
})
})
}
Self::Https(c) => {
let fut = c.call(uri);
Box::pin(async {
match fut.await? {
MaybeHttpsStream::Http(t) => Ok(ConnStream::Tcp { transport: t }),
MaybeHttpsStream::Https(t) => Ok(ConnStream::Tls { transport: t }),
}
})
}
},
_ => {
let fut = async {
let stream = match self {
MaybeHttpsConnector::Http(c) => ConnStream::Tcp { transport: c.call(uri).await? },
MaybeHttpsConnector::Https(c) => match c.call(uri).await? {
MaybeHttpsStream::Http(t) => ConnStream::Tcp { transport: t},
MaybeHttpsStream::Https(t) => ConnStream::Tls{ transport: t},
},
};

Ok(stream)
};
Box::pin(fut)
}
}
}

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self {
MaybeHttpsConnector::Http(c) => c.poll_ready(cx).map_err(|e| e.into() ),
MaybeHttpsConnector::Http(c) => c.poll_ready(cx).map_err(|e| e.into()),
MaybeHttpsConnector::Https(c) => c.poll_ready(cx),
}
}
Expand All @@ -280,6 +282,7 @@ mod tests {
/// Verify that the Connector type implements the correct bound Connect + Clone
/// to be able to use the hyper::Client
fn test_hyper_client_from_connector() {
let _: hyper::Client<MaybeHttpsConnector> = hyper::Client::builder().build(MaybeHttpsConnector::new());
let _: hyper::Client<MaybeHttpsConnector> =
hyper::Client::builder().build(MaybeHttpsConnector::new());
}
}
4 changes: 2 additions & 2 deletions ddprof-exporter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub use connector::uds::socket_path_to_uri;
const DURATION_ZERO: std::time::Duration = std::time::Duration::from_millis(0);
const DATADOG_CONTAINER_ID_HEADER: &str = "Datadog-Container-ID";

type HttpClient = hyper::Client<connector::Connector, hyper::Body>;
type HttpClient = hyper::Client<connector::MaybeHttpsConnector, hyper::Body>;

pub struct Exporter {
client: HttpClient,
Expand Down Expand Up @@ -227,7 +227,7 @@ impl Exporter {
// Set idle to 0, which prevents the pipe being broken every 2nd request
let client = hyper::Client::builder()
.pool_max_idle_per_host(0)
.build(connector::Connector::new());
.build(connector::MaybeHttpsConnector::new());
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
Expand Down

0 comments on commit f78daf3

Please sign in to comment.