Skip to content

Commit

Permalink
Move telemetry::transport to transport::metrics (linkerd#85)
Browse files Browse the repository at this point in the history
Following linkerd#84, the `telemetry::transport` module can be moved into the
`transport` module.

This should allow us to simplify type signatures by combining redundant
types. It's also hoped that we can reduce the API boilerplate around
metrics so it's much easier to instrument and track new metrics in
transport code.
  • Loading branch information
olix0r authored Aug 24, 2018
1 parent 8a9a9bf commit 8ea9a36
Show file tree
Hide file tree
Showing 10 changed files with 34 additions and 97 deletions.
6 changes: 3 additions & 3 deletions src/bind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use watch_service::{WatchService, Rebind};
pub struct Bind<C, B> {
ctx: C,
sensors: telemetry::Sensors,
transport_registry: telemetry::transport::Registry,
transport_registry: transport::metrics::Registry,
tls_client_config: tls::ClientConfigWatch,
_p: PhantomData<fn() -> B>,
}
Expand Down Expand Up @@ -150,7 +150,7 @@ pub type HttpResponse = http::Response<telemetry::http::service::ResponseBody<Ht
pub type HttpRequest<B> = http::Request<telemetry::http::service::RequestBody<B>>;

pub type Client<B> = transparency::Client<
telemetry::transport::Connect<transport::Connect>,
transport::metrics::Connect<transport::Connect>,
::logging::ClientExecutor<&'static str, SocketAddr>,
B,
>;
Expand Down Expand Up @@ -184,7 +184,7 @@ impl Error for BufferSpawnError {
impl<B> Bind<(), B> {
pub fn new(
sensors: telemetry::Sensors,
transport_registry: telemetry::transport::Registry,
transport_registry: transport::metrics::Registry,
tls_client_config: tls::ClientConfigWatch
) -> Self {
Self {
Expand Down
2 changes: 1 addition & 1 deletion src/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ mod tests {
fn new_inbound(default: Option<net::SocketAddr>, ctx: ctx::Proxy) -> Inbound<()> {
let bind = Bind::new(
::telemetry::Sensors::for_test(),
::telemetry::transport::Registry::default(),
::transport::metrics::Registry::default(),
tls::ClientConfig::no_tls()
);
Inbound::new(default, bind.with_ctx(ctx))
Expand Down
23 changes: 15 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,18 @@ where
);

let (taps, observe) = control::Observe::new(100);
let (sensors, transport_registry, tls_config_sensor, metrics_server) = telemetry::new(
start_time,
config.metrics_retain_idle,
&taps,
);
let (http_sensors, http_report) = telemetry::http::new(config.metrics_retain_idle, &taps);

let (transport_registry, transport_report) = transport::metrics::new();

let (tls_config_sensor, tls_config_report) = telemetry::tls_config_reload::new();

let report = telemetry::Report::new(
http_report,
transport_report,
tls_config_report,
telemetry::process::Report::new(start_time),
);

let tls_client_config = tls_config_watch.client.clone();
let tls_cfg_bg = tls_config_watch.start(tls_config_sensor);
Expand Down Expand Up @@ -289,7 +296,7 @@ where
let (drain_tx, drain_rx) = drain::channel();

let bind = Bind::new(
sensors.clone(),
http_sensors.clone(),
transport_registry.clone(),
tls_client_config
);
Expand Down Expand Up @@ -359,7 +366,7 @@ where
let metrics = control::serve_http(
"metrics",
metrics_listener,
metrics_server,
linkerd2_metrics::Serve::new(report),
);

rt.spawn(::logging::admin().bg("resolver").future(resolver_bg));
Expand Down Expand Up @@ -402,7 +409,7 @@ fn serve<R, B, E, F, G>(
tcp_connect_timeout: Duration,
disable_protocol_detection_ports: IndexSet<u16>,
proxy_ctx: ctx::Proxy,
transport_registry: telemetry::transport::Registry,
transport_registry: transport::metrics::Registry,
get_orig_dst: G,
drain_rx: drain::Watch,
) -> impl Future<Item = (), Error = io::Error> + Send + 'static
Expand Down
22 changes: 2 additions & 20 deletions src/telemetry/mod.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,15 @@
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};

use linkerd2_metrics as metrics;

mod errno;
pub mod http;
mod process;
pub mod process;
mod report;
pub mod tap;
pub mod tls_config_reload;
pub mod transport;

use self::errno::Errno;
pub use self::errno::Errno;
pub use self::http::event::Event;
pub use self::report::Report;
pub use self::http::Sensors;

pub type ServeMetrics = metrics::Serve<Report>;

pub fn new(
start_time: SystemTime,
metrics_retain_idle: Duration,
taps: &Arc<Mutex<tap::Taps>>,
) -> (Sensors, transport::Registry, tls_config_reload::Sensor, ServeMetrics) {
let process = process::Report::new(start_time);
let (http_sensors, http_report) = http::new(metrics_retain_idle, taps);
let (transport_registry, transport_report) = transport::new();
let (tls_config_sensor, tls_config_report) = tls_config_reload::new();

let report = Report::new(http_report, transport_report, tls_config_report, process);
(http_sensors, transport_registry, tls_config_sensor, ServeMetrics::new(report))
}
5 changes: 3 additions & 2 deletions src/telemetry/report.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::fmt;

use super::{http, process, tls_config_reload, transport};
use transport::metrics as transport;
use super::{http, process, tls_config_reload};
use super::metrics::FmtMetrics;

/// Implements `FmtMetrics` to report runtime metrics.
Expand All @@ -15,7 +16,7 @@ pub struct Report {
// ===== impl Report =====

impl Report {
pub(super) fn new(
pub fn new(
http: http::Report,
transports: transport::Report,
tls_config_reload: tls_config_reload::Report,
Expand Down
8 changes: 3 additions & 5 deletions src/transparency/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tower_service::NewService;
use tower_h2;

use transport::{Connection, Peek};
use ctx::Proxy as ProxyCtx;
use ctx::transport::{Server as ServerCtx};
use drain;
use telemetry;
use transport::GetOriginalDst;
use transport::{self, Connection, GetOriginalDst, Peek};
use super::glue::{HttpBody, HttpBodyNewSvc, HyperServerSvc};
use super::protocol::Protocol;
use super::tcp;
Expand Down Expand Up @@ -47,7 +45,7 @@ where
listen_addr: SocketAddr,
new_service: S,
proxy_ctx: ProxyCtx,
transport_registry: telemetry::transport::Registry,
transport_registry: transport::metrics::Registry,
tcp: tcp::Proxy,
log: ::logging::Server,
}
Expand Down Expand Up @@ -76,7 +74,7 @@ where
pub fn new(
listen_addr: SocketAddr,
proxy_ctx: ProxyCtx,
transport_registry: telemetry::transport::Registry,
transport_registry: transport::metrics::Registry,
get_orig_dst: G,
stack: S,
tcp_connect_timeout: Duration,
Expand Down
5 changes: 2 additions & 3 deletions src/transparency/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use ctx::transport::{
Client as ClientCtx,
Server as ServerCtx,
};
use telemetry;
use timeout::Timeout;
use transport::{self, tls};
use ctx::transport::TlsStatus;
Expand All @@ -22,14 +21,14 @@ use ctx::transport::TlsStatus;
#[derive(Debug, Clone)]
pub struct Proxy {
connect_timeout: Duration,
transport_registry: telemetry::transport::Registry,
transport_registry: transport::metrics::Registry,
}

impl Proxy {
/// Create a new TCP `Proxy`.
pub fn new(
connect_timeout: Duration,
transport_registry: telemetry::transport::Registry
transport_registry: transport::metrics::Registry
) -> Self {
Self {
connect_timeout,
Expand Down
File renamed without changes.
59 changes: 4 additions & 55 deletions src/telemetry/transport/mod.rs → src/transport/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use std::time::Instant;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_connect;

use ctx;
use telemetry::Errno;
use telemetry::metrics::{
use linkerd2_metrics::{
latency,
Counter,
FmtLabels,
Expand All @@ -17,6 +15,9 @@ use telemetry::metrics::{
Histogram,
Metric,
};

use ctx;
use telemetry::Errno;
use transport::Connection;

mod io;
Expand Down Expand Up @@ -197,50 +198,6 @@ impl Registry {
}
}

#[cfg(test)]
impl Registry {

pub fn open_total(&self, ctx: &ctx::transport::Ctx) -> u64 {
self.0.lock().unwrap().0
.get(&Key::new(ctx))
.map(|m| m.lock().unwrap().open_total.into())
.unwrap_or(0)
}

// pub fn open_connections(&self, ctx: &ctx::transport::Ctx) -> u64 {
// self.0.lock().unwrap().0
// .get(&Key::new(ctx))
// .map(|m| m.lock().unwrap().open_connections.into())
// .unwrap_or(0)
// }

pub fn rx_tx_bytes_total(&self, ctx: &ctx::transport::Ctx) -> (u64, u64) {
self.0.lock().unwrap().0
.get(&Key::new(ctx))
.map(|m| {
let m = m.lock().unwrap();
(m.read_bytes_total.into(), m.write_bytes_total.into())
})
.unwrap_or((0, 0))
}

pub fn close_total(&self, ctx: &ctx::transport::Ctx, eos: Eos) -> u64 {
self.0.lock().unwrap().0
.get(&Key::new(ctx))
.and_then(move |m| m.lock().unwrap().by_eos.get(&eos).map(|m| m.close_total.into()))
.unwrap_or(0)
}

pub fn connection_durations(&self, ctx: &ctx::transport::Ctx, eos: Eos) -> Histogram<latency::Ms> {
self.0.lock().unwrap().0
.get(&Key::new(ctx))
.and_then(move |m| {
m.lock().unwrap().by_eos.get(&eos).map(|m| m.connection_duration.clone())
})
.unwrap_or_default()
}
}

// ===== impl Report =====

impl FmtMetrics for Report {
Expand Down Expand Up @@ -343,14 +300,6 @@ impl NewSensor {
// ===== impl Key =====

impl Key {
#[cfg(test)]
fn new(ctx: &ctx::transport::Ctx) -> Self {
match ctx {
ctx::transport::Ctx::Client(ctx) => Self::client(ctx),
ctx::transport::Ctx::Server(ctx) => Self::server(ctx),
}
}

fn client(ctx: &ctx::transport::Client) -> Self {
Self {
proxy: ctx.proxy,
Expand Down
1 change: 1 addition & 0 deletions src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod connect;
mod connection;
mod addr_info;
mod io;
pub mod metrics;
mod prefixed;
pub mod tls;

Expand Down

0 comments on commit 8ea9a36

Please sign in to comment.