Skip to content

Commit

Permalink
Apply tapping logic only when taps are active (linkerd#142)
Browse files Browse the repository at this point in the history
Previously, as the proxy processed requests, it would:

Obtain the taps mutex ~4x per request to determine whether taps are active.
Construct an "event" ~4x per request, regardless of whether any taps were
active.
Furthermore, this relied on fragile caching logic, where the grpc server
manages individual stream states in a Map to determine when all streams have
been completed. And, beyond the complexity of caching, this approach makes it
difficult to expand Tap functionality (for instance, to support tapping of
payloads).

This change entirely rewrites the proxy's Tap logic to (1) prevent the need
to acquire muteces in the request path, (2) only produce events as needed to
satisfy tap requests, and (3) provide clear (private) API boundaries between
the Tap server and Stack, completely hiding gRPC details from the tap service.

The tap::service module now provides a middleware that is generic over a
way to discover Taps; and the tap::grpc module (previously,
control::observe), implements a gRPC service that advertises Taps such that
their lifetimes are managed properly, leveraging RAII instead of hand-rolled
map-based caching.

There is one user-facing change: tap stream IDs are now calculated relative to
the tap server. The base id is assigned from the count of tap requests that have
been made to the proxy; and the stream ID corresponds to an integer on [0, limit).
  • Loading branch information
olix0r committed Nov 30, 2018
1 parent edd124f commit 82524e4
Show file tree
Hide file tree
Showing 20 changed files with 1,701 additions and 1,609 deletions.
2 changes: 0 additions & 2 deletions lib/stack/src/stack_per_request.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#![allow(dead_code)]

use futures::Poll;
use std::fmt;

Expand Down
45 changes: 29 additions & 16 deletions src/app/inbound.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use http;
use indexmap::IndexMap;
use std::fmt;
use std::net::SocketAddr;

Expand Down Expand Up @@ -33,10 +34,6 @@ impl classify::CanClassify for Endpoint {
}

impl Endpoint {
pub fn dst_name(&self) -> Option<&NameAddr> {
self.dst_name.as_ref()
}

fn target(&self) -> connect::Target {
let tls = Conditional::None(tls::ReasonForNoTls::InternalTraffic);
connect::Target::new(self.addr, tls)
Expand All @@ -49,13 +46,32 @@ impl settings::router::HasConnect for Endpoint {
}
}

impl From<Endpoint> for tap::Endpoint {
fn from(ep: Endpoint) -> Self {
tap::Endpoint {
direction: tap::Direction::In,
target: ep.target(),
labels: Default::default(),
}
impl tap::Inspect for Endpoint {
fn src_addr<B>(&self, req: &http::Request<B>) -> Option<SocketAddr> {
req.extensions().get::<Source>().map(|s| s.remote)
}

fn src_tls<B>(&self, req: &http::Request<B>) -> tls::Status {
req.extensions()
.get::<Source>()
.map(|s| s.tls_status)
.unwrap_or_else(|| Conditional::None(tls::ReasonForNoTls::Disabled))
}

fn dst_addr<B>(&self, _: &http::Request<B>) -> Option<SocketAddr> {
Some(self.addr)
}

fn dst_labels<B>(&self, _: &http::Request<B>) -> Option<&IndexMap<String, String>> {
None
}

fn dst_tls<B>(&self, _: &http::Request<B>) -> tls::Status {
Conditional::None(tls::ReasonForNoTls::InternalTraffic)
}

fn is_outbound<B>(&self, _: &http::Request<B>) -> bool {
false
}
}

Expand Down Expand Up @@ -103,10 +119,10 @@ impl<A> router::Recognize<http::Request<A>> for RecognizeEndpoint {
}

pub mod orig_proto_downgrade {
use std::marker::PhantomData;
use http;
use proxy::http::orig_proto;
use proxy::server::Source;
use std::marker::PhantomData;
use svc;

#[derive(Debug)]
Expand Down Expand Up @@ -168,10 +184,7 @@ pub mod orig_proto_downgrade {

fn make(&self, target: &Source) -> Result<Self::Value, Self::Error> {
debug!("downgrading requests; source={:?}", target);
self
.inner
.make(&target)
.map(orig_proto::Downgrade::new)
self.inner.make(&target).map(orig_proto::Downgrade::new)
}
}
}
Expand Down
18 changes: 9 additions & 9 deletions src/app/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,7 @@ where
panic!("invalid DNS configuration: {:?}", e);
});

let tap_next_id = tap::NextId::default();
let (taps, observe) = control::Observe::new(100);
let (tap_layer, tap_grpc, tap_daemon) = tap::new();

let (ctl_http_metrics, ctl_http_report) = {
let (m, r) = http_metrics::new::<ControlLabels, Class>(config.metrics_retain_idle);
Expand Down Expand Up @@ -329,7 +328,7 @@ where
.push(buffer::layer())
.push(settings::router::layer::<Endpoint, _>())
.push(orig_proto_upgrade::layer())
.push(tap::layer(tap_next_id.clone(), taps.clone()))
.push(tap_layer.clone())
.push(metrics::layer::<_, classify::Response>(
endpoint_http_metrics,
))
Expand Down Expand Up @@ -478,7 +477,8 @@ where
let endpoint_router = client_stack
.push(buffer::layer())
.push(settings::router::layer::<Endpoint, _>())
.push(tap::layer(tap_next_id, taps))
.push(phantom_data::layer())
.push(tap_layer)
.push(http_metrics::layer::<_, classify::Response>(
endpoint_http_metrics,
))
Expand Down Expand Up @@ -593,19 +593,19 @@ where
let mut rt =
current_thread::Runtime::new().expect("initialize admin thread runtime");

let tap = serve_tap(control_listener, TapServer::new(observe));

let metrics = control::serve_http(
"metrics",
metrics_listener,
metrics::Serve::new(report),
);

// tap is already wrapped in a logging Future.
rt.spawn(tap);
// metrics_server is already wrapped in a logging Future.
rt.spawn(tap_daemon.map_err(|_| ()));
rt.spawn(serve_tap(control_listener, TapServer::new(tap_grpc)));

rt.spawn(metrics);

rt.spawn(::logging::admin().bg("dns-resolver").future(dns_bg));

rt.spawn(
::logging::admin()
.bg("resolver")
Expand Down
40 changes: 30 additions & 10 deletions src/app/outbound.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::fmt;
use indexmap::IndexMap;
use std::{fmt, net};

use control::destination::{Metadata, ProtocolHint};
use proxy::http::settings;
use svc;
use tap;
use transport::{connect, tls};
use NameAddr;
use {Conditional, NameAddr};

#[derive(Clone, Debug)]
pub struct Endpoint {
Expand Down Expand Up @@ -52,17 +53,36 @@ impl svc::watch::WithUpdate<tls::ConditionalClientConfig> for Endpoint {
}
}

impl From<Endpoint> for tap::Endpoint {
fn from(ep: Endpoint) -> Self {
// TODO add route labels...
tap::Endpoint {
direction: tap::Direction::Out,
labels: ep.metadata.labels().clone(),
target: ep.connect.clone(),
}
impl tap::Inspect for Endpoint {

fn src_addr<B>(&self, req: &http::Request<B>) -> Option<net::SocketAddr> {
use proxy::server::Source;

req.extensions().get::<Source>().map(|s| s.remote)
}

fn src_tls<B>(&self, _: &http::Request<B>) -> tls::Status {
Conditional::None(tls::ReasonForNoTls::InternalTraffic)
}

fn dst_addr<B>(&self, _: &http::Request<B>) -> Option<net::SocketAddr> {
Some(self.connect.addr)
}

fn dst_labels<B>(&self, _: &http::Request<B>) -> Option<&IndexMap<String, String>> {
Some(self.metadata.labels())
}

fn dst_tls<B>(&self, _: &http::Request<B>) -> tls::Status {
self.metadata.tls_status()
}

fn is_outbound<B>(&self, _: &http::Request<B>) -> bool {
true
}
}


pub mod discovery {
use futures::{Async, Poll};
use std::net::SocketAddr;
Expand Down
4 changes: 4 additions & 0 deletions src/control/destination/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,4 +222,8 @@ impl Metadata {
pub fn tls_identity(&self) -> Conditional<&tls::Identity, tls::ReasonForNoIdentity> {
self.tls_identity.as_ref()
}

pub fn tls_status(&self) -> tls::Status {
self.tls_identity().map(|_| ())
}
}
3 changes: 0 additions & 3 deletions src/control/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
mod cache;
pub mod destination;
mod observe;
pub mod pb;
mod remote_stream;
mod serve_http;

pub use self::observe::Observe;
pub use self::serve_http::serve_http;
178 changes: 0 additions & 178 deletions src/control/observe.rs

This file was deleted.

Loading

0 comments on commit 82524e4

Please sign in to comment.