From 82524e4a1f5d846fbcae259c92253543d067d6c6 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Thu, 29 Nov 2018 17:12:48 -0800 Subject: [PATCH] Apply tapping logic only when taps are active (#142) Previously, as the proxy processed requests, it would: Obtain the taps mutex ~4x per request to determine whether taps are active. Construct an "event" ~4x per request, regardless of whether any taps were active. Furthermore, this relied on fragile caching logic, where the grpc server manages individual stream states in a Map to determine when all streams have been completed. And, beyond the complexity of caching, this approach makes it difficult to expand Tap functionality (for instance, to support tapping of payloads). This change entirely rewrites the proxy's Tap logic to (1) prevent the need to acquire muteces in the request path, (2) only produce events as needed to satisfy tap requests, and (3) provide clear (private) API boundaries between the Tap server and Stack, completely hiding gRPC details from the tap service. The tap::service module now provides a middleware that is generic over a way to discover Taps; and the tap::grpc module (previously, control::observe), implements a gRPC service that advertises Taps such that their lifetimes are managed properly, leveraging RAII instead of hand-rolled map-based caching. There is one user-facing change: tap stream IDs are now calculated relative to the tap server. The base id is assigned from the count of tap requests that have been made to the proxy; and the stream ID corresponds to an integer on [0, limit). --- lib/stack/src/stack_per_request.rs | 2 - src/app/inbound.rs | 45 ++- src/app/main.rs | 18 +- src/app/outbound.rs | 40 +- src/control/destination/mod.rs | 4 + src/control/mod.rs | 3 - src/control/observe.rs | 178 --------- src/control/pb.rs | 210 ---------- src/proxy/http/mod.rs | 6 + src/proxy/http/profiles.rs | 2 - src/proxy/http/settings.rs | 13 +- src/tap/ctx.rs | 85 ---- src/tap/daemon.rs | 200 ++++++++++ src/tap/event.rs | 85 ---- src/tap/grpc/match_.rs | 471 ++++++++++++++++++++++ src/tap/grpc/mod.rs | 4 + src/tap/grpc/server.rs | 549 +++++++++++++++++++++++++ src/tap/match_.rs | 543 ------------------------- src/tap/mod.rs | 237 +++++++---- src/tap/service.rs | 615 +++++++++++------------------ 20 files changed, 1701 insertions(+), 1609 deletions(-) delete mode 100644 src/control/observe.rs delete mode 100644 src/control/pb.rs delete mode 100644 src/tap/ctx.rs create mode 100644 src/tap/daemon.rs delete mode 100644 src/tap/event.rs create mode 100644 src/tap/grpc/match_.rs create mode 100644 src/tap/grpc/mod.rs create mode 100644 src/tap/grpc/server.rs delete mode 100644 src/tap/match_.rs diff --git a/lib/stack/src/stack_per_request.rs b/lib/stack/src/stack_per_request.rs index 46adb66551326..a3b5e27bdb905 100644 --- a/lib/stack/src/stack_per_request.rs +++ b/lib/stack/src/stack_per_request.rs @@ -1,5 +1,3 @@ -#![allow(dead_code)] - use futures::Poll; use std::fmt; diff --git a/src/app/inbound.rs b/src/app/inbound.rs index 86aeab9c7f80b..338f72682fc0f 100644 --- a/src/app/inbound.rs +++ b/src/app/inbound.rs @@ -1,4 +1,5 @@ use http; +use indexmap::IndexMap; use std::fmt; use std::net::SocketAddr; @@ -33,10 +34,6 @@ impl classify::CanClassify for Endpoint { } impl Endpoint { - pub fn dst_name(&self) -> Option<&NameAddr> { - self.dst_name.as_ref() - } - fn target(&self) -> connect::Target { let tls = Conditional::None(tls::ReasonForNoTls::InternalTraffic); connect::Target::new(self.addr, tls) @@ -49,13 +46,32 @@ impl settings::router::HasConnect for Endpoint { } } -impl From for tap::Endpoint { - fn from(ep: Endpoint) -> Self { - tap::Endpoint { - direction: tap::Direction::In, - target: ep.target(), - labels: Default::default(), - } +impl tap::Inspect for Endpoint { + fn src_addr(&self, req: &http::Request) -> Option { + req.extensions().get::().map(|s| s.remote) + } + + fn src_tls(&self, req: &http::Request) -> tls::Status { + req.extensions() + .get::() + .map(|s| s.tls_status) + .unwrap_or_else(|| Conditional::None(tls::ReasonForNoTls::Disabled)) + } + + fn dst_addr(&self, _: &http::Request) -> Option { + Some(self.addr) + } + + fn dst_labels(&self, _: &http::Request) -> Option<&IndexMap> { + None + } + + fn dst_tls(&self, _: &http::Request) -> tls::Status { + Conditional::None(tls::ReasonForNoTls::InternalTraffic) + } + + fn is_outbound(&self, _: &http::Request) -> bool { + false } } @@ -103,10 +119,10 @@ impl router::Recognize> for RecognizeEndpoint { } pub mod orig_proto_downgrade { - use std::marker::PhantomData; use http; use proxy::http::orig_proto; use proxy::server::Source; + use std::marker::PhantomData; use svc; #[derive(Debug)] @@ -168,10 +184,7 @@ pub mod orig_proto_downgrade { fn make(&self, target: &Source) -> Result { debug!("downgrading requests; source={:?}", target); - self - .inner - .make(&target) - .map(orig_proto::Downgrade::new) + self.inner.make(&target).map(orig_proto::Downgrade::new) } } } diff --git a/src/app/main.rs b/src/app/main.rs index c43779cfed0a2..0f1dc4534d2fd 100644 --- a/src/app/main.rs +++ b/src/app/main.rs @@ -193,8 +193,7 @@ where panic!("invalid DNS configuration: {:?}", e); }); - let tap_next_id = tap::NextId::default(); - let (taps, observe) = control::Observe::new(100); + let (tap_layer, tap_grpc, tap_daemon) = tap::new(); let (ctl_http_metrics, ctl_http_report) = { let (m, r) = http_metrics::new::(config.metrics_retain_idle); @@ -329,7 +328,7 @@ where .push(buffer::layer()) .push(settings::router::layer::()) .push(orig_proto_upgrade::layer()) - .push(tap::layer(tap_next_id.clone(), taps.clone())) + .push(tap_layer.clone()) .push(metrics::layer::<_, classify::Response>( endpoint_http_metrics, )) @@ -478,7 +477,8 @@ where let endpoint_router = client_stack .push(buffer::layer()) .push(settings::router::layer::()) - .push(tap::layer(tap_next_id, taps)) + .push(phantom_data::layer()) + .push(tap_layer) .push(http_metrics::layer::<_, classify::Response>( endpoint_http_metrics, )) @@ -593,19 +593,19 @@ where let mut rt = current_thread::Runtime::new().expect("initialize admin thread runtime"); - let tap = serve_tap(control_listener, TapServer::new(observe)); - let metrics = control::serve_http( "metrics", metrics_listener, metrics::Serve::new(report), ); - // tap is already wrapped in a logging Future. - rt.spawn(tap); - // metrics_server is already wrapped in a logging Future. + rt.spawn(tap_daemon.map_err(|_| ())); + rt.spawn(serve_tap(control_listener, TapServer::new(tap_grpc))); + rt.spawn(metrics); + rt.spawn(::logging::admin().bg("dns-resolver").future(dns_bg)); + rt.spawn( ::logging::admin() .bg("resolver") diff --git a/src/app/outbound.rs b/src/app/outbound.rs index fb33fbdbbe0d4..b42ecc1b2e996 100644 --- a/src/app/outbound.rs +++ b/src/app/outbound.rs @@ -1,11 +1,12 @@ -use std::fmt; +use indexmap::IndexMap; +use std::{fmt, net}; use control::destination::{Metadata, ProtocolHint}; use proxy::http::settings; use svc; use tap; use transport::{connect, tls}; -use NameAddr; +use {Conditional, NameAddr}; #[derive(Clone, Debug)] pub struct Endpoint { @@ -52,17 +53,36 @@ impl svc::watch::WithUpdate for Endpoint { } } -impl From for tap::Endpoint { - fn from(ep: Endpoint) -> Self { - // TODO add route labels... - tap::Endpoint { - direction: tap::Direction::Out, - labels: ep.metadata.labels().clone(), - target: ep.connect.clone(), - } +impl tap::Inspect for Endpoint { + + fn src_addr(&self, req: &http::Request) -> Option { + use proxy::server::Source; + + req.extensions().get::().map(|s| s.remote) + } + + fn src_tls(&self, _: &http::Request) -> tls::Status { + Conditional::None(tls::ReasonForNoTls::InternalTraffic) + } + + fn dst_addr(&self, _: &http::Request) -> Option { + Some(self.connect.addr) + } + + fn dst_labels(&self, _: &http::Request) -> Option<&IndexMap> { + Some(self.metadata.labels()) + } + + fn dst_tls(&self, _: &http::Request) -> tls::Status { + self.metadata.tls_status() + } + + fn is_outbound(&self, _: &http::Request) -> bool { + true } } + pub mod discovery { use futures::{Async, Poll}; use std::net::SocketAddr; diff --git a/src/control/destination/mod.rs b/src/control/destination/mod.rs index dad8506f5087e..51c75132beaa1 100644 --- a/src/control/destination/mod.rs +++ b/src/control/destination/mod.rs @@ -222,4 +222,8 @@ impl Metadata { pub fn tls_identity(&self) -> Conditional<&tls::Identity, tls::ReasonForNoIdentity> { self.tls_identity.as_ref() } + + pub fn tls_status(&self) -> tls::Status { + self.tls_identity().map(|_| ()) + } } diff --git a/src/control/mod.rs b/src/control/mod.rs index af0c8cec53fd5..7a3f32be234f0 100644 --- a/src/control/mod.rs +++ b/src/control/mod.rs @@ -1,9 +1,6 @@ mod cache; pub mod destination; -mod observe; -pub mod pb; mod remote_stream; mod serve_http; -pub use self::observe::Observe; pub use self::serve_http::serve_http; diff --git a/src/control/observe.rs b/src/control/observe.rs deleted file mode 100644 index e4d149978be48..0000000000000 --- a/src/control/observe.rs +++ /dev/null @@ -1,178 +0,0 @@ -use futures::{future, Poll, Stream}; -use futures_mpsc_lossy; -use http::HeaderMap; -use indexmap::IndexMap; -use std::sync::{Arc, Mutex}; -use std::sync::atomic::{AtomicUsize, Ordering}; -use tower_grpc::{self as grpc, Response}; - -use api::tap::{server, ObserveRequest, TapEvent}; -use convert::*; -use tap::{event, Event, Tap, Taps}; - -#[derive(Clone, Debug)] -pub struct Observe { - next_id: Arc, - taps: Arc>, - tap_capacity: usize, -} - -pub struct TapEvents { - rx: futures_mpsc_lossy::Receiver, - remaining: usize, - current: IndexMap, - tap_id: usize, - taps: Arc>, -} - -impl Observe { - pub fn new(tap_capacity: usize) -> (Arc>, Observe) { - let taps = Arc::new(Mutex::new(Taps::default())); - - let observe = Observe { - next_id: Arc::new(AtomicUsize::new(0)), - tap_capacity, - taps: taps.clone(), - }; - - (taps, observe) - } -} - -impl server::Tap for Observe { - type ObserveStream = TapEvents; - type ObserveFuture = future::FutureResult, grpc::Error>; - - fn observe(&mut self, req: grpc::Request) -> Self::ObserveFuture { - if self.next_id.load(Ordering::Acquire) == ::std::usize::MAX { - return future::err(grpc::Error::Grpc( - grpc::Status::with_code(grpc::Code::Internal), - HeaderMap::new(), - )); - } - - let req = req.into_inner(); - let (tap, rx) = match req.match_ - .and_then(|m| Tap::new(&m, self.tap_capacity).ok()) - { - Some(m) => m, - None => { - return future::err(grpc::Error::Grpc( - grpc::Status::with_code(grpc::Code::InvalidArgument), - HeaderMap::new(), - )); - } - }; - - let tap_id = match self.taps.lock() { - Ok(mut taps) => { - let tap_id = self.next_id.fetch_add(1, Ordering::AcqRel); - let _ = (*taps).insert(tap_id, tap); - tap_id - } - Err(_) => { - return future::err(grpc::Error::Grpc( - grpc::Status::with_code(grpc::Code::Internal), - HeaderMap::new(), - )); - } - }; - - let events = TapEvents { - rx, - tap_id, - current: IndexMap::default(), - remaining: req.limit as usize, - taps: self.taps.clone(), - }; - - future::ok(Response::new(events)) - } -} - -impl Stream for TapEvents { - type Item = TapEvent; - type Error = grpc::Error; - - fn poll(&mut self) -> Poll, Self::Error> { - loop { - if self.remaining == 0 && self.current.is_empty() { - trace!("tap completed"); - return Ok(None.into()); - } - - let poll: Poll, Self::Error> = - self.rx.poll().or_else(|_| Ok(None.into())); - - trace!("polling; remaining={}; current={}", self.remaining, { - use std::fmt::Write; - let mut s = String::new(); - write!(s, "[").unwrap(); - for id in self.current.keys() { - write!(s, "{},", *id).unwrap(); - } - write!(s, "]").unwrap(); - s - }); - match try_ready!(poll) { - Some(ev) => { - match ev { - Event::StreamRequestOpen(ref req) => { - if self.remaining == 0 { - trace!("exhausted; ignoring req={}", req.id); - continue; - } - trace!("insert req={}", req.id); - self.remaining -= 1; - let _ = self.current.insert(req.id, req.clone()); - } - Event::StreamRequestFail(ref req, _) => { - trace!("fail req={}", req.id); - if self.current.remove(&req.id).is_none() { - warn!("did not exist req={}", req.id); - continue; - } - } - Event::StreamResponseOpen(ref rsp, _) => { - trace!("response req={}", rsp.request.id); - if !self.current.contains_key(&rsp.request.id) { - warn!("did not exist req={}", rsp.request.id); - continue; - } - } - Event::StreamResponseFail(ref rsp, _) | - Event::StreamResponseEnd(ref rsp, _) => { - trace!("end req={}", rsp.request.id); - if self.current.remove(&rsp.request.id).is_none() { - warn!("did not exist req={}", rsp.request.id); - continue; - } - } - ev => { - trace!("ignoring event: {:?}", ev); - continue - } - } - - trace!("emitting tap event: {:?}", ev); - if let Ok(te) = TapEvent::try_from(&ev) { - trace!("emitted tap event"); - // TODO Do limit checks here. - return Ok(Some(te).into()); - } - } - None => { - return Ok(None.into()); - } - } - } - } -} - -impl Drop for TapEvents { - fn drop(&mut self) { - if let Ok(mut taps) = self.taps.lock() { - let _ = (*taps).remove(self.tap_id); - } - } -} diff --git a/src/control/pb.rs b/src/control/pb.rs deleted file mode 100644 index ef674eebc508e..0000000000000 --- a/src/control/pb.rs +++ /dev/null @@ -1,210 +0,0 @@ -#![allow(dead_code)] -#![cfg_attr(feature = "cargo-clippy", allow(clippy))] - -use std::error::Error; -use std::fmt; - -use api::*; -use convert::*; -use proxy; -use tap::event::{self, Event}; - -#[derive(Debug, Clone)] -pub struct UnknownEvent; - -impl fmt::Display for UnknownEvent { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "unknown tap event") - } -} - -impl Error for UnknownEvent { - #[inline] - fn description(&self) -> &str { - "unknown tap event" - } -} - -impl event::StreamResponseEnd { - fn to_tap_event(&self, ctx: &event::Request) -> tap::TapEvent { - let eos = self.grpc_status - .map(tap::Eos::from_grpc_status) - ; - - let end = tap::tap_event::http::ResponseEnd { - id: Some(tap::tap_event::http::StreamId { - base: 0, // TODO FIXME - stream: ctx.id as u64, - }), - since_request_init: Some(pb_elapsed(self.request_open_at, self.response_end_at)), - since_response_init: Some(pb_elapsed(self.response_open_at, self.response_end_at)), - response_bytes: self.bytes_sent, - eos, - }; - - tap::TapEvent { - proxy_direction: ctx.endpoint.direction.as_pb().into(), - source: Some((&ctx.source.remote).into()), - source_meta: Some(ctx.source.src_meta()), - destination: Some((&ctx.endpoint.target.addr).into()), - destination_meta: Some(ctx.endpoint.dst_meta()), - event: Some(tap::tap_event::Event::Http(tap::tap_event::Http { - event: Some(tap::tap_event::http::Event::ResponseEnd(end)), - })), - } - } -} - -impl event::StreamResponseFail { - fn to_tap_event(&self, ctx: &event::Request) -> tap::TapEvent { - let end = tap::tap_event::http::ResponseEnd { - id: Some(tap::tap_event::http::StreamId { - base: 0, // TODO FIXME - stream: ctx.id as u64, - }), - since_request_init: Some(pb_elapsed(self.request_open_at, self.response_fail_at)), - since_response_init: Some(pb_elapsed(self.response_open_at, self.response_fail_at)), - response_bytes: self.bytes_sent, - eos: Some(self.error.into()), - }; - - tap::TapEvent { - proxy_direction: ctx.endpoint.direction.as_pb().into(), - source: Some((&ctx.source.remote).into()), - source_meta: Some(ctx.source.src_meta()), - destination: Some((&ctx.endpoint.target.addr).into()), - destination_meta: Some(ctx.endpoint.dst_meta()), - event: Some(tap::tap_event::Event::Http(tap::tap_event::Http { - event: Some(tap::tap_event::http::Event::ResponseEnd(end)), - })), - } - } -} - -impl event::StreamRequestFail { - fn to_tap_event(&self, ctx: &event::Request) -> tap::TapEvent { - let end = tap::tap_event::http::ResponseEnd { - id: Some(tap::tap_event::http::StreamId { - base: 0, // TODO FIXME - stream: ctx.id as u64, - }), - since_request_init: Some(pb_elapsed(self.request_open_at, self.request_fail_at)), - since_response_init: None, - response_bytes: 0, - eos: Some(self.error.into()), - }; - - tap::TapEvent { - proxy_direction: ctx.endpoint.direction.as_pb().into(), - source: Some((&ctx.source.remote).into()), - source_meta: Some(ctx.source.src_meta()), - destination: Some((&ctx.endpoint.target.addr).into()), - destination_meta: Some(ctx.endpoint.dst_meta()), - event: Some(tap::tap_event::Event::Http(tap::tap_event::Http { - event: Some(tap::tap_event::http::Event::ResponseEnd(end)), - })), - } - } -} - -impl<'a> TryFrom<&'a Event> for tap::TapEvent { - type Err = UnknownEvent; - fn try_from(ev: &'a Event) -> Result { - let tap_ev = match *ev { - Event::StreamRequestOpen(ref ctx) => { - let init = tap::tap_event::http::RequestInit { - id: Some(tap::tap_event::http::StreamId { - base: 0, - // TODO FIXME - stream: ctx.id as u64, - }), - method: Some((&ctx.method).into()), - scheme: ctx.scheme.as_ref().map(http_types::Scheme::from), - authority: ctx.authority.as_ref() - .map(|a| a.as_str()) - .unwrap_or_default() - .into(), - path: ctx.path.clone(), - }; - - tap::TapEvent { - proxy_direction: ctx.endpoint.direction.as_pb().into(), - source: Some((&ctx.source.remote).into()), - source_meta: Some(ctx.source.src_meta()), - destination: Some((&ctx.endpoint.target.addr).into()), - destination_meta: Some(ctx.endpoint.dst_meta()), - event: Some(tap::tap_event::Event::Http(tap::tap_event::Http { - event: Some(tap::tap_event::http::Event::RequestInit(init)), - })), - } - } - - Event::StreamResponseOpen(ref ctx, ref rsp) => { - let init = tap::tap_event::http::ResponseInit { - id: Some(tap::tap_event::http::StreamId { - base: 0, - // TODO FIXME - stream: ctx.request.id as u64, - }), - since_request_init: Some(pb_elapsed(rsp.request_open_at, rsp.response_open_at)), - http_status: u32::from(ctx.status.as_u16()), - }; - - tap::TapEvent { - proxy_direction: ctx.request.endpoint.direction.as_pb().into(), - source: Some((&ctx.request.source.remote).into()), - source_meta: Some(ctx.request.source.src_meta()), - destination: Some((&ctx.request.endpoint.target.addr).into()), - destination_meta: Some(ctx.request.endpoint.dst_meta()), - event: Some(tap::tap_event::Event::Http(tap::tap_event::Http { - event: Some(tap::tap_event::http::Event::ResponseInit(init)), - })), - } - } - - Event::StreamRequestFail(ref ctx, ref fail) => { - fail.to_tap_event(&ctx) - } - - Event::StreamResponseEnd(ref ctx, ref end) => { - end.to_tap_event(&ctx.request) - } - - Event::StreamResponseFail(ref ctx, ref fail) => { - fail.to_tap_event(&ctx.request) - } - - _ => return Err(UnknownEvent), - }; - - Ok(tap_ev) - } -} - -impl proxy::Source { - fn src_meta(&self) -> tap::tap_event::EndpointMeta { - let mut meta = tap::tap_event::EndpointMeta::default(); - - meta.labels.insert("tls".to_owned(), format!("{}", self.tls_status)); - - meta - } -} - -impl event::Direction { - fn as_pb(&self) -> tap::tap_event::ProxyDirection { - match self { - event::Direction::Out => tap::tap_event::ProxyDirection::Outbound, - event::Direction::In => tap::tap_event::ProxyDirection::Inbound, - } - } -} - -impl event::Endpoint { - fn dst_meta(&self) -> tap::tap_event::EndpointMeta { - let mut meta = tap::tap_event::EndpointMeta::default(); - meta.labels.extend(self.labels.clone()); - meta.labels.insert("tls".to_owned(), format!("{}", self.target.tls_status())); - meta - } -} diff --git a/src/proxy/http/mod.rs b/src/proxy/http/mod.rs index 6ccd936b7ceab..8d9704ee0132d 100644 --- a/src/proxy/http/mod.rs +++ b/src/proxy/http/mod.rs @@ -23,6 +23,12 @@ pub trait HasH2Reason { fn h2_reason(&self) -> Option<::h2::Reason>; } +impl HasH2Reason for ::h2::Error { + fn h2_reason(&self) -> Option<::h2::Reason> { + self.reason() + } +} + impl HasH2Reason for super::buffer::ServiceError { fn h2_reason(&self) -> Option<::h2::Reason> { match self { diff --git a/src/proxy/http/profiles.rs b/src/proxy/http/profiles.rs index 0bddc6e3c79de..a682934effb91 100644 --- a/src/proxy/http/profiles.rs +++ b/src/proxy/http/profiles.rs @@ -1,5 +1,3 @@ -#![allow(dead_code)] - extern crate tower_discover; use futures::Stream; diff --git a/src/proxy/http/settings.rs b/src/proxy/http/settings.rs index e3ac7775e65ff..3161131237091 100644 --- a/src/proxy/http/settings.rs +++ b/src/proxy/http/settings.rs @@ -191,6 +191,16 @@ pub mod router { } } + impl Clone for Service + where + M: svc::Stack, + M::Value: svc::Service>, + { + fn clone(&self) -> Self { + Self { router: self.router.clone() } + } + } + impl svc::Service> for Service where M: svc::Stack, @@ -212,7 +222,8 @@ pub mod router { } fn call(&mut self, req: http::Request) -> Self::Future { - ResponseFuture { inner: self.router.call(req) } + let inner = self.router.call(req); + ResponseFuture { inner } } } diff --git a/src/tap/ctx.rs b/src/tap/ctx.rs deleted file mode 100644 index d034cb26838e4..0000000000000 --- a/src/tap/ctx.rs +++ /dev/null @@ -1,85 +0,0 @@ -use indexmap::IndexMap; -use http; -use std::sync::{Arc, atomic::AtomicUsize}; -use std::sync::atomic::Ordering; - -use ctx; - - -/// A `RequestId` can be mapped to a `u64`. No `RequestId`s will map to the -/// same value within a process. -/// -/// XXX `usize` is too small except on 64-bit platforms. TODO: Use `u64` when -/// `AtomicU64` becomes stable. -#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] -pub struct RequestId(usize); - -/// Describes a stream's request headers. -#[derive(Debug)] -pub struct Request { - // A numeric ID useful for debugging & correlation. - pub id: RequestId, - - pub uri: http::Uri, - pub method: http::Method, - - /// Identifies the proxy server that received the request. - pub server: Arc, - - /// Identifies the proxy client that dispatched the request. - pub client: Arc, -} - -/// Describes a stream's response headers. -#[derive(Debug)] -pub struct Response { - pub request: Arc, - - pub status: http::StatusCode, -} - -impl RequestId { - fn next() -> Self { - static NEXT_REQUEST_ID: AtomicUsize = AtomicUsize::new(0); - RequestId(NEXT_REQUEST_ID.fetch_add(1, Ordering::SeqCst)) - } -} - -impl Into for RequestId { - fn into(self) -> u64 { - self.0 as u64 - } -} - -impl Request { - pub fn new( - request: &http::Request, - server: &Arc, - client: &Arc, - ) -> Arc { - let r = Self { - id: RequestId::next(), - uri: request.uri().clone(), - method: request.method().clone(), - server: Arc::clone(server), - client: Arc::clone(client), - }; - - Arc::new(r) - } - - pub fn labels(&self) -> &IndexMap { - self.client.labels() - } -} - -impl Response { - pub fn new(response: &http::Response, request: &Arc) -> Arc { - let r = Self { - status: response.status(), - request: Arc::clone(request), - }; - - Arc::new(r) - } -} diff --git a/src/tap/daemon.rs b/src/tap/daemon.rs new file mode 100644 index 0000000000000..9acc8f2f6fe6e --- /dev/null +++ b/src/tap/daemon.rs @@ -0,0 +1,200 @@ +use futures::sync::{mpsc, oneshot}; +use futures::{Async, Future, Poll, Stream}; +use never::Never; + +use super::iface::Tap; + +pub fn new() -> (Daemon, Register, Subscribe) { + let (svc_tx, svc_rx) = mpsc::channel(super::REGISTER_CHANNEL_CAPACITY); + let (tap_tx, tap_rx) = mpsc::channel(super::TAP_CAPACITY); + + let daemon = Daemon { + svc_rx, + svcs: Vec::default(), + + tap_rx, + taps: Vec::default(), + }; + + (daemon, Register(svc_tx), Subscribe(tap_tx)) +} + +/// A background task that connects a tap server and proxy services. +/// +/// The daemon provides `Register` to allow proxy services to listen for new +/// taps; and it provides `Subscribe` to allow the tap server to advertise new +/// taps to proxy services. +#[must_use = "daemon must be polled"] +#[derive(Debug)] +pub struct Daemon { + svc_rx: mpsc::Receiver>, + svcs: Vec>, + + tap_rx: mpsc::Receiver<(T, oneshot::Sender<()>)>, + taps: Vec, +} + +#[derive(Debug)] +pub struct Register(mpsc::Sender>); + +#[derive(Debug)] +pub struct Subscribe(mpsc::Sender<(T, oneshot::Sender<()>)>); + +#[derive(Debug)] +pub struct SubscribeFuture(FutState); + +#[derive(Debug)] +enum FutState { + Subscribe { + tap: Option, + tap_tx: mpsc::Sender<(T, oneshot::Sender<()>)>, + }, + Pending(oneshot::Receiver<()>), +} + +impl Future for Daemon { + type Item = (); + type Error = Never; + + fn poll(&mut self) -> Poll<(), Never> { + // Drop taps that are no longer active (i.e. the response stream has + // been dropped). + let tap_count = self.taps.len(); + self.taps.retain(|t| t.can_tap_more()); + trace!("retained {} of {} taps", self.taps.len(), tap_count); + + // Drop services that are no longer active. + for idx in (0..self.svcs.len()).rev() { + // It's "okay" if a service isn't ready to receive taps. We just + // fall back to being lossy rather than dropping the service + // entirely. + if self.svcs[idx].poll_ready().is_err() { + trace!("removing a service"); + self.svcs.swap_remove(idx); + } + } + + // Connect newly-created services to active taps. + while let Ok(Async::Ready(Some(mut svc))) = self.svc_rx.poll() { + trace!("registering a service"); + + // Notify the service of all active taps. + let mut dropped = false; + for tap in &self.taps { + debug_assert!(!dropped); + + let err = svc.try_send(tap.clone()).err(); + + // If the service has been dropped, make sure that it's not added. + dropped = err.as_ref().map(|e| e.is_disconnected()).unwrap_or(false); + + // If service can't receive any more taps, stop trying. + if err.is_some() { + break; + } + } + + if !dropped { + self.svcs.push(svc); + trace!("service registered"); + } + } + + // Connect newly-created taps to existing services. + while let Ok(Async::Ready(Some((tap, ack)))) = self.tap_rx.poll() { + trace!("subscribing a tap"); + if self.taps.len() == super::TAP_CAPACITY { + warn!("tap capacity exceeded"); + drop(ack); + continue; + } + if !tap.can_tap_more() { + trace!("tap already dropped"); + drop(ack); + continue; + } + + // Notify services of the new tap. If the service has been dropped, + // it's removed from the registry. If it's full, it isn't notified + // of the tap. + for idx in (0..self.svcs.len()).rev() { + let err = self.svcs[idx].try_send(tap.clone()).err(); + if err.map(|e| e.is_disconnected()).unwrap_or(false) { + trace!("removing a service"); + self.svcs.swap_remove(idx); + } + } + + self.taps.push(tap); + let _ = ack.send(()); + trace!("tap subscribed"); + } + + Ok(Async::NotReady) + } +} + +impl Clone for Register { + fn clone(&self) -> Self { + Register(self.0.clone()) + } +} + +impl super::iface::Register for Register { + type Tap = T; + type Taps = mpsc::Receiver; + + fn register(&mut self) -> Self::Taps { + let (tx, rx) = mpsc::channel(super::TAP_CAPACITY); + if let Err(_) = self.0.try_send(tx) { + debug!("failed to register service"); + } + rx + } +} + +impl Clone for Subscribe { + fn clone(&self) -> Self { + Subscribe(self.0.clone()) + } +} + +impl super::iface::Subscribe for Subscribe { + type Future = SubscribeFuture; + + fn subscribe(&mut self, tap: T) -> Self::Future { + SubscribeFuture(FutState::Subscribe { + tap: Some(tap), + tap_tx: self.0.clone(), + }) + } +} + +impl Future for SubscribeFuture { + type Item = (); + type Error = super::iface::NoCapacity; + + fn poll(&mut self) -> Poll<(), Self::Error> { + loop { + self.0 = match self.0 { + FutState::Subscribe { + ref mut tap, + ref mut tap_tx, + } => { + try_ready!(tap_tx.poll_ready().map_err(|_| super::iface::NoCapacity)); + + let tap = tap.take().expect("tap must be set"); + let (tx, rx) = oneshot::channel(); + tap_tx + .try_send((tap, tx)) + .map_err(|_| super::iface::NoCapacity)?; + + FutState::Pending(rx) + } + FutState::Pending(ref mut rx) => { + return rx.poll().map_err(|_| super::iface::NoCapacity); + } + } + } + } +} diff --git a/src/tap/event.rs b/src/tap/event.rs deleted file mode 100644 index dbb9b28aa3b57..0000000000000 --- a/src/tap/event.rs +++ /dev/null @@ -1,85 +0,0 @@ -use h2; -use http; -use indexmap::IndexMap; -use std::time::Instant; - -use proxy::Source; -use transport::connect; - -// TODO this should be replaced with a string name. -#[derive(Copy, Clone, Debug)] -pub enum Direction { In, Out } - -#[derive(Clone, Debug)] -pub struct Endpoint { - pub direction: Direction, - pub target: connect::Target, - pub labels: IndexMap, -} - -#[derive(Clone, Debug)] -pub struct Request { - pub id: usize, - pub source: Source, - pub endpoint: Endpoint, - pub method: http::Method, - pub scheme: Option, - pub authority: Option, - pub path: String, -} - -#[derive(Clone, Debug)] -pub struct Response { - pub request: Request, - pub status: http::StatusCode, -} - -#[derive(Clone, Debug)] -pub enum Event { - StreamRequestOpen(Request), - StreamRequestFail(Request, StreamRequestFail), - StreamRequestEnd(Request, StreamRequestEnd), - - StreamResponseOpen(Response, StreamResponseOpen), - StreamResponseFail(Response, StreamResponseFail), - StreamResponseEnd(Response, StreamResponseEnd), -} - -#[derive(Clone, Debug)] -pub struct StreamRequestFail { - pub request_open_at: Instant, - pub request_fail_at: Instant, - pub error: h2::Reason, -} - -#[derive(Clone, Debug)] -pub struct StreamRequestEnd { - pub request_open_at: Instant, - pub request_end_at: Instant, -} - -#[derive(Clone, Debug)] -pub struct StreamResponseOpen { - pub request_open_at: Instant, - pub response_open_at: Instant, -} - -#[derive(Clone, Debug)] -pub struct StreamResponseFail { - pub request_open_at: Instant, - pub response_open_at: Instant, - pub response_first_frame_at: Option, - pub response_fail_at: Instant, - pub error: h2::Reason, - pub bytes_sent: u64, -} - -#[derive(Clone, Debug)] -pub struct StreamResponseEnd { - pub request_open_at: Instant, - pub response_open_at: Instant, - pub response_first_frame_at: Instant, - pub response_end_at: Instant, - pub grpc_status: Option, - pub bytes_sent: u64, -} diff --git a/src/tap/grpc/match_.rs b/src/tap/grpc/match_.rs new file mode 100644 index 0000000000000..0b123ff13eae6 --- /dev/null +++ b/src/tap/grpc/match_.rs @@ -0,0 +1,471 @@ +use http; +use indexmap::IndexMap; +use ipnet::{Contains, Ipv4Net, Ipv6Net}; +use std::boxed::Box; +use std::net; +use std::{error, fmt}; + +use api::net::ip_address; +use api::tap::observe_request; +use convert::TryFrom; + +use tap::Inspect; + +#[derive(Clone, Debug)] +pub enum Match { + Any(Vec), + All(Vec), + Not(Box), + Source(TcpMatch), + Destination(TcpMatch), + DestinationLabel(LabelMatch), + Http(HttpMatch), +} + +#[derive(Debug, Eq, PartialEq)] +pub enum InvalidMatch { + Empty, + InvalidPort, + InvalidNetwork, + InvalidHttpMethod, + InvalidScheme, +} + +#[derive(Clone, Debug)] +pub struct LabelMatch { + key: String, + value: String, +} + +#[derive(Clone, Debug)] +pub enum TcpMatch { + // Inclusive + PortRange(u16, u16), + Net(NetMatch), +} + +#[derive(Clone, Debug)] +pub enum NetMatch { + Net4(Ipv4Net), + Net6(Ipv6Net), +} + +#[derive(Clone, Debug)] +pub enum HttpMatch { + Scheme(http::uri::Scheme), + Method(http::Method), + Path(observe_request::match_::http::string_match::Match), + Authority(observe_request::match_::http::string_match::Match), +} + +// ===== impl Match ====== + +impl Match { + fn from_seq(seq: observe_request::match_::Seq) -> Result, InvalidMatch> { + let mut new = Vec::with_capacity(seq.matches.len()); + for m in seq.matches.into_iter().filter_map(|m| m.match_) { + new.push(Self::try_from(m)?); + } + + Ok(new) + } + + pub fn matches(&self, req: &http::Request, inspect: &I) -> bool { + match self { + Match::Any(ref ms) => ms.iter().any(|m| m.matches(req, inspect)), + Match::All(ref ms) => ms.iter().all(|m| m.matches(req, inspect)), + Match::Not(ref not) => !not.matches(req, inspect), + Match::Source(ref src) => inspect + .src_addr(req) + .map(|s| src.matches(s)) + .unwrap_or(false), + Match::Destination(ref dst) => inspect + .dst_addr(req) + .map(|d| dst.matches(d)) + .unwrap_or(false), + Match::DestinationLabel(ref lbl) => inspect + .dst_labels(req) + .map(|l| lbl.matches(l)) + .unwrap_or(false), + Match::Http(ref http) => http.matches(req, inspect), + } + } +} + +impl Match { + pub fn try_new(m: Option) -> Result { + m.and_then(|m| m.match_) + .map(Self::try_from) + .unwrap_or_else(|| Err(InvalidMatch::Empty)) + } +} + +impl TryFrom for Match { + type Err = InvalidMatch; + + #[allow(unconditional_recursion)] + fn try_from(m: observe_request::match_::Match) -> Result { + use api::tap::observe_request::match_; + + match m { + match_::Match::All(seq) => Self::from_seq(seq).map(Match::All), + match_::Match::Any(seq) => Self::from_seq(seq).map(Match::Any), + match_::Match::Not(m) => m + .match_ + .ok_or(InvalidMatch::Empty) + .and_then(Self::try_from) + .map(|m| Match::Not(Box::new(m))), + match_::Match::Source(src) => TcpMatch::try_from(src).map(Match::Source), + match_::Match::Destination(dst) => TcpMatch::try_from(dst).map(Match::Destination), + match_::Match::DestinationLabel(l) => { + LabelMatch::try_from(l).map(Match::DestinationLabel) + } + match_::Match::Http(http) => HttpMatch::try_from(http).map(Match::Http), + } + } +} + +// ===== impl LabelMatch ====== + +impl LabelMatch { + fn matches(&self, labels: &IndexMap) -> bool { + labels.get(&self.key) == Some(&self.value) + } +} + +impl TryFrom for LabelMatch { + type Err = InvalidMatch; + + fn try_from(m: observe_request::match_::Label) -> Result { + if m.key.is_empty() || m.value.is_empty() { + return Err(InvalidMatch::Empty); + } + + Ok(LabelMatch { + key: m.key.clone(), + value: m.value.clone(), + }) + } +} + +// ===== impl TcpMatch ====== + +impl TcpMatch { + fn matches(&self, addr: net::SocketAddr) -> bool { + match self { + // If either a minimum or maximum is not specified, the range is considered to + // be over a discrete value. + TcpMatch::PortRange(min, max) => *min <= addr.port() && addr.port() <= *max, + TcpMatch::Net(net) => net.matches(&addr.ip()), + } + } +} + +impl TryFrom for TcpMatch { + type Err = InvalidMatch; + + fn try_from(m: observe_request::match_::Tcp) -> Result { + use api::tap::observe_request::match_::tcp; + + m.match_.ok_or(InvalidMatch::Empty).and_then(|t| match t { + tcp::Match::Ports(range) => { + // If either a minimum or maximum is not specified, the range is considered to + // be over a discrete value. + let min = if range.min == 0 { range.max } else { range.min }; + let max = if range.max == 0 { range.min } else { range.max }; + if min == 0 || max == 0 { + return Err(InvalidMatch::Empty); + } + if min > u32::from(::std::u16::MAX) || max > u32::from(::std::u16::MAX) { + return Err(InvalidMatch::InvalidPort); + } + Ok(TcpMatch::PortRange(min as u16, max as u16)) + } + + tcp::Match::Netmask(netmask) => NetMatch::try_from(netmask).map(TcpMatch::Net), + }) + } +} + +// ===== impl NetMatch ====== + +impl NetMatch { + fn matches(&self, addr: &net::IpAddr) -> bool { + match self { + NetMatch::Net4(net) => match addr { + net::IpAddr::V6(_) => false, + net::IpAddr::V4(addr) => net.contains(addr), + }, + NetMatch::Net6(net) => match addr { + net::IpAddr::V4(_) => false, + net::IpAddr::V6(addr) => net.contains(addr), + }, + } + } +} + +impl TryFrom for NetMatch { + type Err = InvalidMatch; + + fn try_from(m: observe_request::match_::tcp::Netmask) -> Result { + let mask = if m.mask == 0 { + return Err(InvalidMatch::Empty); + } else if m.mask > u32::from(::std::u8::MAX) { + return Err(InvalidMatch::InvalidNetwork); + } else { + m.mask as u8 + }; + + let net = match m.ip.and_then(|a| a.ip).ok_or(InvalidMatch::Empty)? { + ip_address::Ip::Ipv4(n) => { + let ip = n.into(); + let net = Ipv4Net::new(ip, mask).map_err(|_| InvalidMatch::InvalidNetwork)?; + NetMatch::Net4(net) + } + ip_address::Ip::Ipv6(n) => { + let ip = (&n).into(); + let net = Ipv6Net::new(ip, mask).map_err(|_| InvalidMatch::InvalidNetwork)?; + NetMatch::Net6(net) + } + }; + + Ok(net) + } +} + +// ===== impl HttpMatch ====== + +impl HttpMatch { + fn matches(&self, req: &http::Request, inspect: &I) -> bool { + match self { + HttpMatch::Scheme(ref m) => { + m == req.uri().scheme_part().unwrap_or(&http::uri::Scheme::HTTP) + } + + HttpMatch::Method(ref m) => m == req.method(), + + HttpMatch::Authority(ref m) => inspect + .authority(req) + .map(|a| Self::matches_string(m, &a)) + .unwrap_or(false), + + HttpMatch::Path(ref m) => Self::matches_string(m, req.uri().path()), + } + } + + fn matches_string( + string_match: &observe_request::match_::http::string_match::Match, + value: &str, + ) -> bool { + use api::tap::observe_request::match_::http::string_match::Match::*; + + match string_match { + Exact(ref exact) => value == exact, + Prefix(ref prefix) => value.starts_with(prefix), + } + } +} + +impl TryFrom for HttpMatch { + type Err = InvalidMatch; + fn try_from(m: observe_request::match_::Http) -> Result { + use api::http_types::scheme::{Registered, Type}; + use api::tap::observe_request::match_::http::Match as Pb; + + m.match_.ok_or(InvalidMatch::Empty).and_then(|m| match m { + Pb::Scheme(s) => s.type_.ok_or(InvalidMatch::Empty).and_then(|s| match s { + Type::Registered(reg) if reg == Registered::Http.into() => { + Ok(HttpMatch::Scheme(http::uri::Scheme::HTTP)) + } + Type::Registered(reg) if reg == Registered::Https.into() => { + Ok(HttpMatch::Scheme(http::uri::Scheme::HTTPS)) + } + Type::Registered(_) => Err(InvalidMatch::InvalidScheme), + Type::Unregistered(ref s) => http::uri::Scheme::from_shared(s.as_str().into()) + .map(HttpMatch::Scheme) + .map_err(|_| InvalidMatch::InvalidScheme), + }), + + Pb::Method(m) => m + .type_ + .ok_or(InvalidMatch::Empty) + .and_then(|m| m.try_as_http().map_err(|_| InvalidMatch::InvalidHttpMethod)) + .map(HttpMatch::Method), + + Pb::Authority(a) => a + .match_ + .ok_or(InvalidMatch::Empty) + .map(|a| HttpMatch::Authority(a)), + + Pb::Path(p) => p + .match_ + .ok_or(InvalidMatch::Empty) + .map(|p| HttpMatch::Path(p)), + }) + } +} + +#[cfg(test)] +mod tests { + use ipnet::{Contains, Ipv4Net, Ipv6Net}; + use quickcheck::*; + use std::collections::HashMap; + + use super::*; + use api::http_types; + + impl Arbitrary for LabelMatch { + fn arbitrary(g: &mut G) -> Self { + Self { + key: Arbitrary::arbitrary(g), + value: Arbitrary::arbitrary(g), + } + } + } + + impl Arbitrary for TcpMatch { + fn arbitrary(g: &mut G) -> Self { + if g.gen::() { + TcpMatch::Net(NetMatch::arbitrary(g)) + } else { + TcpMatch::PortRange(g.gen(), g.gen()) + } + } + } + + impl Arbitrary for NetMatch { + fn arbitrary(g: &mut G) -> Self { + if g.gen::() { + let addr = net::Ipv4Addr::arbitrary(g); + let bits = g.gen::() % 32; + let net = Ipv4Net::new(addr, bits).expect("ipv4 network address"); + NetMatch::Net4(net) + } else { + let addr = net::Ipv6Addr::arbitrary(g); + let bits = g.gen::() % 128; + let net = Ipv6Net::new(addr, bits).expect("ipv6 network address"); + NetMatch::Net6(net) + } + } + } + + quickcheck! { + fn tcp_from_proto(tcp: observe_request::match_::Tcp) -> bool { + use self::observe_request::match_::tcp; + + let err: Option = + tcp.match_.as_ref() + .map(|m| match m { + tcp::Match::Ports(ps) => { + let ok = 0 < ps.min && + ps.min <= ps.max && + ps.max < u32::from(::std::u16::MAX); + if ok { None } else { Some(InvalidMatch::InvalidPort) } + } + tcp::Match::Netmask(n) => { + match n.ip.as_ref().and_then(|ip| ip.ip.as_ref()) { + Some(_) => None, + None => Some(InvalidMatch::Empty), + } + } + }) + .unwrap_or(Some(InvalidMatch::Empty)); + + err == TcpMatch::try_from(tcp).err() + } + + fn tcp_matches(m: TcpMatch, addr: net::SocketAddr) -> bool { + let matches = match (&m, addr.ip()) { + (&TcpMatch::Net(NetMatch::Net4(ref n)), net::IpAddr::V4(ip)) => { + n.contains(&ip) + } + (&TcpMatch::Net(NetMatch::Net6(ref n)), net::IpAddr::V6(ip)) => { + n.contains(&ip) + } + (&TcpMatch::PortRange(min, max), _) => { + min <= addr.port() && addr.port() <= max + } + _ => false + }; + + m.matches(addr) == matches + } + + fn labels_from_proto(label: observe_request::match_::Label) -> bool { + let err: Option = + if label.key.is_empty() || label.value.is_empty() { + Some(InvalidMatch::Empty) + } else { + None + }; + + err == LabelMatch::try_from(label).err() + } + + fn label_matches(l: LabelMatch, labels: HashMap) -> bool { + use std::iter::FromIterator; + + let matches = labels.get(&l.key) == Some(&l.value); + l.matches(&IndexMap::from_iter(labels.into_iter())) == matches + } + + fn http_from_proto(http: observe_request::match_::Http) -> bool { + use self::observe_request::match_::http; + + let err = match http.match_.as_ref() { + None => Some(InvalidMatch::Empty), + Some(http::Match::Method(ref m)) => { + match m.type_.as_ref() { + None => Some(InvalidMatch::Empty), + Some(http_types::http_method::Type::Unregistered(ref m)) if m.len() > 15 => { + Some(InvalidMatch::InvalidHttpMethod) + } + Some(http_types::http_method::Type::Unregistered(m)) => { + ::http::Method::from_bytes(m.as_bytes()) + .err() + .map(|_| InvalidMatch::InvalidHttpMethod) + } + Some(http_types::http_method::Type::Registered(m)) if *m >= 9 => { + Some(InvalidMatch::InvalidHttpMethod) + } + Some(http_types::http_method::Type::Registered(_)) => None, + } + } + Some(http::Match::Scheme(m)) => match m.type_.as_ref() { + None => Some(InvalidMatch::Empty), + Some(http_types::scheme::Type::Unregistered(_)) => None, + Some(http_types::scheme::Type::Registered(m)) if *m < 2 => None, + Some(http_types::scheme::Type::Registered(_)) => Some(InvalidMatch::InvalidScheme), + } + Some(http::Match::Authority(m)) => match m.match_.as_ref() { + None => Some(InvalidMatch::Empty), + Some(_) => None, + } + Some(http::Match::Path(m)) => match m.match_.as_ref() { + None => Some(InvalidMatch::Empty), + Some(_) => None, + } + }; + + err == HttpMatch::try_from(http).err() + } + } +} + +impl fmt::Display for InvalidMatch { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "{}", + match self { + InvalidMatch::Empty => "missing required field", + InvalidMatch::InvalidPort => "invalid port number", + InvalidMatch::InvalidNetwork => "invalid network address", + InvalidMatch::InvalidHttpMethod => "invalid http method", + InvalidMatch::InvalidScheme => "invalid request scheme", + } + ) + } +} + +impl error::Error for InvalidMatch {} diff --git a/src/tap/grpc/mod.rs b/src/tap/grpc/mod.rs new file mode 100644 index 0000000000000..e179f0ba7efb7 --- /dev/null +++ b/src/tap/grpc/mod.rs @@ -0,0 +1,4 @@ +mod match_; +mod server; + +pub use self::server::{Server, Tap}; diff --git a/src/tap/grpc/server.rs b/src/tap/grpc/server.rs new file mode 100644 index 0000000000000..408b0045e4e38 --- /dev/null +++ b/src/tap/grpc/server.rs @@ -0,0 +1,549 @@ +use bytes::Buf; +use futures::sync::{mpsc, oneshot}; +use futures::{future, Async, Future, Poll, Stream}; +use http::HeaderMap; +use hyper::body::Payload; +use never::Never; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Weak}; +use std::time::Instant; +use tokio_timer::clock; +use tower_grpc::{self as grpc, Response}; + +use api::{http_types, pb_duration, tap as api}; + +use super::match_::Match; +use proxy::http::HasH2Reason; +use tap::{iface, Inspect}; + +#[derive(Clone, Debug)] +pub struct Server { + subscribe: T, + base_id: Arc, +} + +#[derive(Debug)] +pub struct ResponseFuture { + subscribe: F, + dispatch: Option, + events_rx: Option>, +} + +#[derive(Debug)] +pub struct ResponseStream { + dispatch: Option, + events_rx: mpsc::Receiver, +} + +#[derive(Debug)] +struct Dispatch { + base_id: u32, + count: usize, + limit: usize, + taps_rx: mpsc::Receiver>, + events_tx: mpsc::Sender, + match_handle: Arc, +} + +#[derive(Clone, Debug)] +struct TapTx { + id: api::tap_event::http::StreamId, + tx: mpsc::Sender, +} + +#[derive(Clone, Debug)] +pub struct Tap { + match_: Weak, + taps_tx: mpsc::Sender>, +} + +#[derive(Debug)] +pub struct TapFuture(FutState); + +#[derive(Debug)] +enum FutState { + Init { + request_init_at: Instant, + taps_tx: mpsc::Sender>, + }, + Pending { + request_init_at: Instant, + rx: oneshot::Receiver, + }, +} + +#[derive(Debug)] +pub struct TapRequest { + request_init_at: Instant, + tap: TapTx, +} + +#[derive(Debug)] +pub struct TapResponse { + base_event: api::TapEvent, + request_init_at: Instant, + tap: TapTx, +} + +#[derive(Debug)] +pub struct TapRequestPayload { + base_event: api::TapEvent, + tap: TapTx, +} + +#[derive(Debug)] +pub struct TapResponsePayload { + base_event: api::TapEvent, + request_init_at: Instant, + response_init_at: Instant, + response_bytes: usize, + tap: TapTx, +} + +// === impl Server === + +impl> Server { + pub(in tap) fn new(subscribe: T) -> Self { + let base_id = Arc::new(0.into()); + Self { base_id, subscribe } + } + + fn invalid_arg(event: http::header::HeaderValue) -> grpc::Error { + let status = grpc::Status::with_code(grpc::Code::InvalidArgument); + let mut headers = HeaderMap::new(); + headers.insert("grpc-message", event); + grpc::Error::Grpc(status, headers) + } +} + +impl api::server::Tap for Server +where + T: iface::Subscribe + Clone, +{ + type ObserveStream = ResponseStream; + type ObserveFuture = future::Either< + future::FutureResult, grpc::Error>, + ResponseFuture, + >; + + fn observe(&mut self, req: grpc::Request) -> Self::ObserveFuture { + let req = req.into_inner(); + + let limit = req.limit as usize; + if limit == 0 { + let v = http::header::HeaderValue::from_static("limit must be positive"); + return future::Either::A(future::err(Self::invalid_arg(v))); + }; + trace!("tap: limit={}", limit); + + // Read the match logic into a type we can use to evaluate against + // requests. This match will be shared (weakly) by all registered + // services to match requests. The response stream strongly holds the + // match until the response is complete. This way, services never + // evaluate matches for taps that have been completed or canceled. + let match_handle = match Match::try_new(req.match_) { + Ok(m) => Arc::new(m), + Err(e) => { + warn!("invalid tap request: {} ", e); + let v = format!("{}", e) + .parse() + .unwrap_or_else(|_| http::header::HeaderValue::from_static("invalid message")); + return future::Either::A(future::err(Self::invalid_arg(v))); + } + }; + + // Wrapping is okay. This is realy just to disambiguate events within a + // single tap session (i.e. that may consist of several tap requests). + let base_id = self.base_id.fetch_add(1, Ordering::AcqRel) as u32; + debug!("tap; id={}; match={:?}", base_id, match_handle); + + // The taps channel is used by services to acquire a `TapTx` for + // `Dispatch`, i.e. ensuring that no more than the requested number of + // taps are executed. + // + // The read side of this channel (held by `dispatch`) is dropped by the + // `ResponseStream` once the `limit` has been reached. This is dropped + // with the strong reference to `match_` so that services can determine + // when a Tap should be dropped. + // + // The response stream continues to process events for open streams + // until all streams have been completed. + let (taps_tx, taps_rx) = mpsc::channel(super::super::TAP_CAPACITY); + + let tap = Tap::new(Arc::downgrade(&match_handle), taps_tx); + let subscribe = self.subscribe.subscribe(tap); + + // The events channel is used to emit tap events to the response stream. + // + // At most `limit` copies of `events_tx` are dispatched to `taps_rx` + // requests. Each tapped request's sender is dropped when the response + // completes, so the event stream closes gracefully when all tapped + // requests are completed without additional coordination. + let (events_tx, events_rx) = + mpsc::channel(super::super::PER_RESPONSE_EVENT_BUFFER_CAPACITY); + + // Reads up to `limit` requests from from `taps_rx` and satisfies them + // with a cpoy of `events_tx`. + let dispatch = Dispatch { + base_id, + count: 0, + limit, + taps_rx, + events_tx, + match_handle, + }; + + future::Either::B(ResponseFuture { + subscribe, + dispatch: Some(dispatch), + events_rx: Some(events_rx), + }) + } +} + +impl> Future for ResponseFuture { + type Item = Response; + type Error = grpc::Error; + + fn poll(&mut self) -> Poll { + // Ensure that tap registers successfully. + match self.subscribe.poll() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(())) => {} + Err(_) => { + let status = grpc::Status::with_code(grpc::Code::ResourceExhausted); + let mut headers = HeaderMap::new(); + headers.insert("grpc-message", "Too many active taps".parse().unwrap()); + return Err(grpc::Error::Grpc(status, headers)); + } + } + + let rsp = ResponseStream { + dispatch: self.dispatch.take(), + events_rx: self.events_rx.take().expect("events_rx must be set"), + }; + + Ok(Response::new(rsp).into()) + } +} + +// === impl ResponseStream === + +impl Stream for ResponseStream { + type Item = api::TapEvent; + type Error = grpc::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + // Drop the dispatch future once it completes so that services do not do + // any more matching against this tap. + // + // Furthermore, this drops the event sender so that `events_rx` closes + // gracefully when all open taps are complete. + self.dispatch = self.dispatch.take().and_then(|mut d| match d.poll() { + Ok(Async::NotReady) => Some(d), + Ok(Async::Ready(())) | Err(_) => None, + }); + + // Read events from taps. The receiver can't actually error, but we need + // to satisfy the type signature, so we coerce errors into EOS. + self.events_rx.poll().or_else(|_| Ok(None.into())) + } +} + +// === impl Dispatch === + +impl Future for Dispatch { + type Item = (); + type Error = (); + + /// Read tap requests, assign each an ID and send it back to the requesting + /// service with an event sender so that it may emittap events. + /// + /// Becomes ready when the limit has been reached. + fn poll(&mut self) -> Poll<(), Self::Error> { + while let Some(tx) = try_ready!(self.taps_rx.poll().map_err(|_| ())) { + debug_assert!(self.count < self.limit - 1); + + self.count += 1; + let tap = TapTx { + tx: self.events_tx.clone(), + id: api::tap_event::http::StreamId { + base: self.base_id, + stream: self.count as u64, + }, + }; + if tx.send(tap).is_err() { + // If the tap isn't sent, then restore the count. + self.count -= 1; + } + + if self.count == self.limit - 1 { + return Ok(Async::Ready(())); + } + } + + Ok(Async::Ready(())) + } +} + +// === impl Tap === + +impl Tap { + fn new(match_: Weak, taps_tx: mpsc::Sender>) -> Self { + Self { match_, taps_tx } + } +} + +impl iface::Tap for Tap { + type TapRequest = TapRequest; + type TapRequestPayload = TapRequestPayload; + type TapResponse = TapResponse; + type TapResponsePayload = TapResponsePayload; + type Future = TapFuture; + + fn can_tap_more(&self) -> bool { + self.match_.upgrade().is_some() + } + + fn should_tap(&self, req: &http::Request, inspect: &I) -> bool { + self.match_ + .upgrade() + .map(|m| m.matches(req, inspect)) + .unwrap_or(false) + } + + fn tap(&mut self) -> Self::Future { + TapFuture(FutState::Init { + request_init_at: clock::now(), + taps_tx: self.taps_tx.clone(), + }) + } +} + +// === impl TapFuture === + +impl Future for TapFuture { + type Item = Option; + type Error = Never; + + fn poll(&mut self) -> Poll { + loop { + self.0 = match self.0 { + FutState::Init { + ref request_init_at, + ref mut taps_tx, + } => { + match taps_tx.poll_ready() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(())) => {} + Err(_) => return Ok(Async::Ready(None)), + } + let (tx, rx) = oneshot::channel(); + + // If this fails, polling `rx` will fail below. + let _ = taps_tx.try_send(tx); + + FutState::Pending { + request_init_at: *request_init_at, + rx, + } + } + FutState::Pending { + ref request_init_at, + ref mut rx, + } => { + return match rx.poll() { + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(tap)) => { + let t = TapRequest { + request_init_at: *request_init_at, + tap, + }; + Ok(Some(t).into()) + } + Err(_) => Ok(None.into()), + }; + } + } + } + } +} + +// === impl TapRequest === + +impl iface::TapRequest for TapRequest { + type TapPayload = TapRequestPayload; + type TapResponse = TapResponse; + type TapResponsePayload = TapResponsePayload; + + fn open( + mut self, + req: &http::Request, + inspect: &I, + ) -> (TapRequestPayload, TapResponse) { + let base_event = base_event(req, inspect); + + let init = api::tap_event::http::RequestInit { + id: Some(self.tap.id.clone()), + method: Some(req.method().into()), + scheme: req.uri().scheme_part().map(http_types::Scheme::from), + authority: inspect.authority(req).unwrap_or_default(), + path: req.uri().path().into(), + }; +; + let event = api::TapEvent { + event: Some(api::tap_event::Event::Http(api::tap_event::Http { + event: Some(api::tap_event::http::Event::RequestInit(init)), + })), + ..base_event.clone() + }; + let _ = self.tap.tx.try_send(event); + + let req = TapRequestPayload { + tap: self.tap.clone(), + base_event: base_event.clone(), + }; + let rsp = TapResponse { + tap: self.tap, + base_event, + request_init_at: self.request_init_at, + }; + (req, rsp) + } +} + +// === impl TapResponse === + +impl iface::TapResponse for TapResponse { + type TapPayload = TapResponsePayload; + + fn tap(mut self, rsp: &http::Response) -> TapResponsePayload { + let response_init_at = clock::now(); + let init = api::tap_event::http::Event::ResponseInit(api::tap_event::http::ResponseInit { + id: Some(self.tap.id.clone()), + since_request_init: Some(pb_duration(response_init_at - self.request_init_at)), + http_status: rsp.status().as_u16().into(), + }); + + let event = api::TapEvent { + event: Some(api::tap_event::Event::Http(api::tap_event::Http { + event: Some(init), + })), + ..self.base_event.clone() + }; + let _ = self.tap.tx.try_send(event); + + TapResponsePayload { + base_event: self.base_event, + request_init_at: self.request_init_at, + response_init_at, + response_bytes: 0, + tap: self.tap, + } + } + + fn fail(mut self, err: &E) { + let response_end_at = clock::now(); + let reason = err.h2_reason(); + let end = api::tap_event::http::Event::ResponseEnd(api::tap_event::http::ResponseEnd { + id: Some(self.tap.id.clone()), + since_request_init: Some(pb_duration(response_end_at - self.request_init_at)), + since_response_init: None, + response_bytes: 0, + eos: Some(api::Eos { + end: reason.map(|r| api::eos::End::ResetErrorCode(r.into())), + }), + }); + + let event = api::TapEvent { + event: Some(api::tap_event::Event::Http(api::tap_event::Http { + event: Some(end), + })), + ..self.base_event + }; + let _ = self.tap.tx.try_send(event); + } +} + +// === impl TapRequestPayload === + +impl iface::TapPayload for TapRequestPayload { + fn data(&mut self, _: &B) {} + + fn eos(self, _: Option<&http::HeaderMap>) {} + + fn fail(self, _: &E) {} +} + +// === impl TapResponsePayload === + +impl iface::TapPayload for TapResponsePayload { + fn data(&mut self, data: &B) { + self.response_bytes += data.remaining(); + } + + fn eos(self, trls: Option<&http::HeaderMap>) { + let end = trls + .and_then(|t| t.get("grpc-status")) + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()) + .map(api::eos::End::GrpcStatusCode); + + self.send(end); + } + + fn fail(self, e: &E) { + let end = e.h2_reason().map(|r| api::eos::End::ResetErrorCode(r.into())); + self.send(end); + } +} + +impl TapResponsePayload { + fn send(mut self, end: Option) { + let response_end_at = clock::now(); + let end = api::tap_event::http::ResponseEnd { + id: Some(self.tap.id), + since_request_init: Some(pb_duration(response_end_at - self.request_init_at)), + since_response_init: Some(pb_duration(response_end_at - self.response_init_at)), + response_bytes: self.response_bytes as u64, + eos: Some(api::Eos { end }), + }; + + let event = api::TapEvent { + event: Some(api::tap_event::Event::Http(api::tap_event::Http { + event: Some(api::tap_event::http::Event::ResponseEnd(end)), + })), + ..self.base_event + }; + let _ = self.tap.tx.try_send(event); + } +} + +// All of the events emitted from tap have a common set of metadata. +// Build this once, without an `event`, so that it can be used to build +// each HTTP event. +fn base_event(req: &http::Request, inspect: &I) -> api::TapEvent { + api::TapEvent { + proxy_direction: if inspect.is_outbound(req) { + api::tap_event::ProxyDirection::Outbound.into() + } else { + api::tap_event::ProxyDirection::Inbound.into() + }, + source: inspect.src_addr(req).as_ref().map(|a| a.into()), + source_meta: { + let mut m = api::tap_event::EndpointMeta::default(); + let tls = format!("{}", inspect.src_tls(req)); + m.labels.insert("tls".to_owned(), tls); + Some(m) + }, + destination: inspect.dst_addr(req).as_ref().map(|a| a.into()), + destination_meta: inspect.dst_labels(req).map(|labels| { + let mut m = api::tap_event::EndpointMeta::default(); + m.labels.extend(labels.clone()); + let tls = format!("{}", inspect.dst_tls(req)); + m.labels.insert("tls".to_owned(), tls); + m + }), + event: None, + } +} diff --git a/src/tap/match_.rs b/src/tap/match_.rs deleted file mode 100644 index c4da4149a4130..0000000000000 --- a/src/tap/match_.rs +++ /dev/null @@ -1,543 +0,0 @@ -use indexmap::IndexMap; -use std::boxed::Box; -use std::net; - -use http; -use ipnet::{Contains, Ipv4Net, Ipv6Net}; - -use super::{event, Event}; -use api::net::ip_address; -use api::tap::observe_request; -use convert::TryFrom; - -#[derive(Clone, Debug)] -pub(super) enum Match { - Any(Vec), - All(Vec), - Not(Box), - Source(TcpMatch), - Destination(TcpMatch), - DestinationLabel(LabelMatch), - Http(HttpMatch), -} - -#[derive(Eq, PartialEq)] -pub enum InvalidMatch { - Empty, - InvalidPort, - InvalidNetwork, - InvalidHttpMethod, - InvalidScheme, - Unimplemented, -} - -#[derive(Clone, Debug)] -pub(super) struct LabelMatch { - key: String, - value: String, -} - -#[derive(Clone, Debug)] -pub(super) enum TcpMatch { - // Inclusive - PortRange(u16, u16), - Net(NetMatch), -} - -#[derive(Clone, Debug)] -pub(super) enum NetMatch { - Net4(Ipv4Net), - Net6(Ipv6Net), -} - -#[derive(Clone, Debug)] -pub(super) enum HttpMatch { - Scheme(String), - Method(http::Method), - Path(observe_request::match_::http::string_match::Match), - Authority(observe_request::match_::http::string_match::Match), -} - -// ===== impl Match ====== - -impl Match { - pub(super) fn matches(&self, ev: &Event) -> bool { - match *self { - Match::Any(ref any) => { - for m in any { - if m.matches(ev) { - return true; - } - } - false - } - - Match::All(ref all) => { - for m in all { - if !m.matches(ev) { - return false; - } - } - true - } - - Match::Not(ref not) => !not.matches(ev), - - Match::Source(ref src) => match *ev { - Event::StreamRequestOpen(ref req) | Event::StreamRequestFail(ref req, _) => { - src.matches(&req.source.remote) - } - Event::StreamResponseOpen(ref rsp, _) | - Event::StreamResponseFail(ref rsp, _) | - Event::StreamResponseEnd(ref rsp, _) => src.matches(&rsp.request.source.remote), - _ => false, - }, - - Match::Destination(ref dst) => match *ev { - Event::StreamRequestOpen(ref req) | Event::StreamRequestFail(ref req, _) => { - dst.matches(&req.endpoint.target.addr) - } - Event::StreamResponseOpen(ref rsp, _) | - Event::StreamResponseFail(ref rsp, _) | - Event::StreamResponseEnd(ref rsp, _) => - dst.matches(&rsp.request.endpoint.target.addr), - _ => false, - }, - - Match::DestinationLabel(ref label) => match *ev { - Event::StreamRequestOpen(ref req) | Event::StreamRequestFail(ref req, _) => { - label.matches(&req.endpoint.labels) - } - - Event::StreamResponseOpen(ref rsp, _) | - Event::StreamResponseFail(ref rsp, _) | - Event::StreamResponseEnd(ref rsp, _) => { - label.matches(&rsp.request.endpoint.labels) - } - - _ => false, - } - - Match::Http(ref http) => match *ev { - Event::StreamRequestOpen(ref req) | Event::StreamRequestFail(ref req, _) => { - http.matches(req) - } - - Event::StreamResponseOpen(ref rsp, _) | - Event::StreamResponseFail(ref rsp, _) | - Event::StreamResponseEnd(ref rsp, _) => http.matches(&rsp.request), - - _ => false, - }, - } - } - - pub(super) fn new(match_: &observe_request::Match) -> Result { - match_ - .match_ - .as_ref() - .map(Match::try_from) - .unwrap_or_else(|| Err(InvalidMatch::Empty)) - } - - fn from_seq(seq: &observe_request::match_::Seq) -> Result, InvalidMatch> { - let mut new = Vec::with_capacity(seq.matches.len()); - - for m in &seq.matches { - if let Some(m) = m.match_.as_ref() { - new.push(Self::try_from(m)?); - } - } - - Ok(new) - } -} - -impl<'a> TryFrom<&'a observe_request::match_::Match> for Match { - type Err = InvalidMatch; - - #[allow(unconditional_recursion)] - fn try_from(m: &observe_request::match_::Match) -> Result { - use api::tap::observe_request::match_; - - let match_ = match *m { - match_::Match::All(ref seq) => Match::All(Self::from_seq(seq)?), - - match_::Match::Any(ref seq) => Match::Any(Self::from_seq(seq)?), - - match_::Match::Not(ref m) => match m.match_.as_ref() { - Some(m) => Match::Not(Box::new(Self::try_from(m)?)), - None => return Err(InvalidMatch::Empty), - }, - - match_::Match::Source(ref src) => Match::Source(TcpMatch::try_from(src)?), - - match_::Match::Destination(ref dst) => Match::Destination(TcpMatch::try_from(dst)?), - - match_::Match::DestinationLabel(ref label) => { - Match::DestinationLabel(LabelMatch::try_from(label)?) - } - - match_::Match::Http(ref http) => Match::Http(HttpMatch::try_from(http)?), - }; - - Ok(match_) - } -} - -// ===== impl LabelMatch ====== - -impl LabelMatch { - fn matches(&self, labels: &IndexMap) -> bool { - labels.get(&self.key) == Some(&self.value) - } -} - -impl<'a> TryFrom<&'a observe_request::match_::Label> for LabelMatch { - type Err = InvalidMatch; - - fn try_from(m: &observe_request::match_::Label) -> Result { - if m.key.is_empty() || m.value.is_empty() { - return Err(InvalidMatch::Empty); - } - - Ok(LabelMatch { - key: m.key.clone(), - value: m.value.clone(), - }) - } -} - -// ===== impl TcpMatch ====== - -impl TcpMatch { - fn matches(&self, addr: &net::SocketAddr) -> bool { - match *self { - // If either a minimum or maximum is not specified, the range is considered to - // be over a discrete value. - TcpMatch::PortRange(min, max) => min <= addr.port() && addr.port() <= max, - - TcpMatch::Net(ref net) => net.matches(&addr.ip()), - } - } -} - -impl<'a> TryFrom<&'a observe_request::match_::Tcp> for TcpMatch { - type Err = InvalidMatch; - - fn try_from(m: &observe_request::match_::Tcp) -> Result { - use api::tap::observe_request::match_::tcp; - - let m = match m.match_.as_ref() { - None => return Err(InvalidMatch::Empty), - Some(m) => m, - }; - - let match_ = match *m { - tcp::Match::Ports(ref range) => { - // If either a minimum or maximum is not specified, the range is considered to - // be over a discrete value. - let min = if range.min == 0 { range.max } else { range.min }; - let max = if range.max == 0 { range.min } else { range.max }; - if min == 0 || max == 0 { - return Err(InvalidMatch::Empty); - } - if min > u32::from(::std::u16::MAX) || max > u32::from(::std::u16::MAX) { - return Err(InvalidMatch::InvalidPort); - } - TcpMatch::PortRange(min as u16, max as u16) - } - - tcp::Match::Netmask(ref netmask) => TcpMatch::Net(NetMatch::try_from(netmask)?), - }; - - Ok(match_) - } -} - -// ===== impl NetMatch ====== - -impl NetMatch { - fn matches(&self, addr: &net::IpAddr) -> bool { - match *self { - NetMatch::Net4(ref net) => match *addr { - net::IpAddr::V6(_) => false, - net::IpAddr::V4(ref addr) => net.contains(addr), - }, - NetMatch::Net6(ref net) => match *addr { - net::IpAddr::V4(_) => false, - net::IpAddr::V6(ref addr) => net.contains(addr), - }, - } - } -} - -impl<'a> TryFrom<&'a observe_request::match_::tcp::Netmask> for NetMatch { - type Err = InvalidMatch; - fn try_from(m: &'a observe_request::match_::tcp::Netmask) -> Result { - let mask = if m.mask == 0 { - return Err(InvalidMatch::Empty); - } else if m.mask > u32::from(::std::u8::MAX) { - return Err(InvalidMatch::InvalidNetwork); - } else { - m.mask as u8 - }; - - let ip = match m.ip.as_ref().and_then(|a| a.ip.as_ref()) { - Some(ip) => ip, - None => return Err(InvalidMatch::Empty), - }; - - let net = match *ip { - ip_address::Ip::Ipv4(ref n) => { - let net = - Ipv4Net::new((*n).into(), mask).map_err(|_| InvalidMatch::InvalidNetwork)?; - NetMatch::Net4(net) - } - ip_address::Ip::Ipv6(ref ip6) => { - let net = Ipv6Net::new(ip6.into(), mask).map_err(|_| InvalidMatch::InvalidNetwork)?; - NetMatch::Net6(net) - } - }; - - Ok(net) - } -} - -// ===== impl HttpMatch ====== - -impl HttpMatch { - fn matches(&self, req: &event::Request) -> bool { - match *self { - HttpMatch::Scheme(ref m) => req.scheme.as_ref() - .map(|s| m == s.as_ref()) - .unwrap_or(false), - - HttpMatch::Method(ref m) => *m == req.method, - - HttpMatch::Authority(ref m) => req.authority.as_ref() - .map(|a| Self::matches_string(m, a.as_str())) - .unwrap_or(false), - - HttpMatch::Path(ref m) => Self::matches_string(m, &req.path), - } - } - - fn matches_string( - string_match: &observe_request::match_::http::string_match::Match, - value: &str, - ) -> bool { - use api::tap::observe_request::match_::http::string_match::Match::*; - - match *string_match { - Exact(ref exact) => value == exact, - Prefix(ref prefix) => value.starts_with(prefix), - } - } -} - -impl<'a> TryFrom<&'a observe_request::match_::Http> for HttpMatch { - type Err = InvalidMatch; - fn try_from(m: &'a observe_request::match_::Http) -> Result { - use api::tap::observe_request::match_::http::Match as Pb; - - m.match_ - .as_ref() - .ok_or_else(|| InvalidMatch::Empty) - .and_then(|m| match *m { - Pb::Scheme(ref s) => s.type_ - .as_ref() - .ok_or_else(|| InvalidMatch::Empty) - .and_then(|s| { - s.try_to_string() - .map(HttpMatch::Scheme) - .map_err(|_| InvalidMatch::InvalidScheme) - }), - - Pb::Method(ref m) => m.type_ - .as_ref() - .ok_or_else(|| InvalidMatch::Empty) - .and_then(|m| { - m.try_as_http() - .map(HttpMatch::Method) - .map_err(|_| InvalidMatch::InvalidHttpMethod) - }), - - Pb::Authority(ref a) => a.match_ - .as_ref() - .ok_or_else(|| InvalidMatch::Empty) - .map(|a| HttpMatch::Authority(a.clone())), - - Pb::Path(ref p) => p.match_ - .as_ref() - .ok_or_else(|| InvalidMatch::Empty) - .map(|p| HttpMatch::Path(p.clone())), - }) - } -} - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - use ipnet::{Contains, Ipv4Net, Ipv6Net}; - use quickcheck::*; - - use super::*; - use api::http_types; - - impl Arbitrary for LabelMatch { - fn arbitrary(g: &mut G) -> Self { - Self { - key: Arbitrary::arbitrary(g), - value: Arbitrary::arbitrary(g), - } - } - } - - impl Arbitrary for TcpMatch { - fn arbitrary(g: &mut G) -> Self { - if g.gen::() { - TcpMatch::Net(NetMatch::arbitrary(g)) - } else { - TcpMatch::PortRange(g.gen(), g.gen()) - } - } - } - - impl Arbitrary for NetMatch { - fn arbitrary(g: &mut G) -> Self { - if g.gen::() { - let addr = net::Ipv4Addr::arbitrary(g); - let bits = g.gen::() % 32; - let net = Ipv4Net::new(addr, bits).expect("ipv4 network address"); - NetMatch::Net4(net) - } else { - let addr = net::Ipv6Addr::arbitrary(g); - let bits = g.gen::() % 128; - let net = Ipv6Net::new(addr, bits).expect("ipv6 network address"); - NetMatch::Net6(net) - } - } - } - - quickcheck! { - fn tcp_from_proto(tcp: observe_request::match_::Tcp) -> bool { - use self::observe_request::match_::tcp; - - let err: Option = - tcp.match_.as_ref() - .map(|m| match *m { - tcp::Match::Ports(ref ps) => { - let ok = 0 < ps.min && - ps.min <= ps.max && - ps.max < u32::from(::std::u16::MAX); - if ok { None } else { Some(InvalidMatch::InvalidPort) } - } - tcp::Match::Netmask(ref n) => { - let ip = n.ip.as_ref().and_then(|a| a.ip.as_ref()); - if ip.is_some() { None } else { Some(InvalidMatch::Empty) } - } - }) - .unwrap_or(Some(InvalidMatch::Empty)); - - err == TcpMatch::try_from(&tcp).err() - } - - fn tcp_matches(m: TcpMatch, addr: net::SocketAddr) -> bool { - let matches = match (&m, addr.ip()) { - (&TcpMatch::Net(NetMatch::Net4(ref n)), net::IpAddr::V4(ip)) => { - n.contains(&ip) - } - (&TcpMatch::Net(NetMatch::Net6(ref n)), net::IpAddr::V6(ip)) => { - n.contains(&ip) - } - (&TcpMatch::PortRange(min, max), _) => { - min <= addr.port() && addr.port() <= max - } - _ => false - }; - - m.matches(&addr) == matches - } - - fn labels_from_proto(label: observe_request::match_::Label) -> bool { - let err: Option = - if label.key.is_empty() || label.value.is_empty() { - Some(InvalidMatch::Empty) - } else { - None - }; - - err == LabelMatch::try_from(&label).err() - } - - fn label_matches(l: LabelMatch, labels: HashMap) -> bool { - let matches = labels.get(&l.key) == Some(&l.value); - let mut ilabels = IndexMap::with_capacity(labels.len()); - for (k, v) in &labels { - ilabels.insert(k.clone(), v.clone()); - } - l.matches(&ilabels) == matches - } - - fn http_from_proto(http: observe_request::match_::Http) -> bool { - use self::observe_request::match_::http; - - let err = match http.match_.as_ref() { - None => Some(InvalidMatch::Empty), - Some(&http::Match::Method(ref m)) => { - match m.type_.as_ref() { - None => Some(InvalidMatch::Empty), - Some(&http_types::http_method::Type::Unregistered(ref m)) => if m.len() <= 15 { - let mut err = None; - if let Err(_) = ::http::Method::from_bytes(m.as_bytes()) { - err = Some(InvalidMatch::InvalidHttpMethod); - } - err - } else { - Some(InvalidMatch::InvalidHttpMethod) - } - Some(&http_types::http_method::Type::Registered(m)) => if m < 9 { - None - } else { - Some(InvalidMatch::InvalidHttpMethod) - } - } - } - Some(&http::Match::Scheme(ref m)) => { - match m.type_.as_ref() { - None => Some(InvalidMatch::Empty), - Some(&http_types::scheme::Type::Unregistered(_)) => None, - Some(&http_types::scheme::Type::Registered(m)) => { - if m < 2 { - None - } else { - Some(InvalidMatch::InvalidScheme) - } - } - } - } - Some(&http::Match::Authority(ref m)) => { - match m.match_ { - None => Some(InvalidMatch::Empty), - Some(_) => None, - } - } - Some(&http::Match::Path(ref m)) => { - match m.match_ { - None => Some(InvalidMatch::Empty), - Some(_) => None, - } - } - }; - - err == HttpMatch::try_from(&http).err() - } - - // TODO - // fn http_matches(m: HttpMatch, ctx: event::Request) -> bool { - // let matches = false; - // m.matches(&addr) == matches - // } - } -} diff --git a/src/tap/mod.rs b/src/tap/mod.rs index 9da36f902f8ab..20d978dd40fd0 100644 --- a/src/tap/mod.rs +++ b/src/tap/mod.rs @@ -1,111 +1,176 @@ -use futures_mpsc_lossy; +use http; use indexmap::IndexMap; -use std::sync::{atomic::{AtomicUsize, Ordering}, Arc}; +use std::net; -use api::tap::observe_request; +use transport::tls; -pub mod event; -mod match_; +mod daemon; +mod grpc; mod service; -pub use self::event::{Direction, Endpoint, Event}; -pub use self::match_::InvalidMatch; -use self::match_::*; -pub use self::service::layer; +/// Instruments service stacks so that requests may be tapped. +pub type Layer = service::Layer>; -#[derive(Clone, Debug, Default)] -pub struct NextId(Arc); +/// A gRPC tap server. +pub type Server = grpc::Server>; -#[derive(Default, Debug)] -pub struct Taps { - by_id: IndexMap, -} +/// A Future that dispatches new tap requests to services and ensures that new +/// services are notified of active tap requests. +pub type Daemon = daemon::Daemon; + +// The maximum number of taps that may be live in the system at once. +const TAP_CAPACITY: usize = 100; + +// The maximum number of registrations that may be queued on the registration +// channel. +const REGISTER_CHANNEL_CAPACITY: usize = 10_000; + +// The number of events that may be buffered for a given response. +const PER_RESPONSE_EVENT_BUFFER_CAPACITY: usize = 400; -#[derive(Debug)] -pub struct Tap { - match_: Match, - tx: futures_mpsc_lossy::Sender, +/// Build the tap subsystem. +pub fn new() -> (Layer, Server, Daemon) { + let (daemon, register, subscribe) = daemon::new(); + let layer = Layer::new(register); + let server = Server::new(subscribe); + (layer, server, daemon) } -/// Indicates the tap is no longer receiving -struct Ended; +/// Inspects a request for a `Stack`. +/// +/// `Stack` target types +pub trait Inspect { + fn src_addr(&self, req: &http::Request) -> Option; + fn src_tls(&self, req: &http::Request) -> tls::Status; -impl Taps { - pub fn insert(&mut self, id: usize, tap: Tap) -> Option { - debug!("insert id={} tap={:?}", id, tap); - self.by_id.insert(id, tap) + fn dst_addr(&self, req: &http::Request) -> Option; + fn dst_labels(&self, req: &http::Request) -> Option<&IndexMap>; + fn dst_tls(&self, req: &http::Request) -> tls::Status; + + fn is_outbound(&self, req: &http::Request) -> bool; + + fn is_inbound(&self, req: &http::Request) -> bool { + !self.is_outbound(req) } - pub fn remove(&mut self, id: usize) -> Option { - debug!("remove id={}", id); - self.by_id.swap_remove(&id) + fn authority(&self, req: &http::Request) -> Option { + req.uri() + .authority_part() + .map(|a| a.as_str().to_owned()) + .or_else(|| { + req.headers() + .get(http::header::HOST) + .and_then(|h| h.to_str().ok()) + .map(|s| s.to_owned()) + }) + } +} + +/// The internal interface used between Layer, Server, and Daemon. +/// +/// These interfaces are provided to decouple the service implementation from any +/// Protobuf or gRPC concerns, hopefully to make this module more testable and +/// easier to change. +/// +/// This module is necessary to seal the traits, which must be public +/// for Layer/Server/Daemon, but need not be implemented outside of the `tap` +/// module. +mod iface { + use bytes::Buf; + use futures::{Future, Stream}; + use http; + use hyper::body::Payload; + use never::Never; + + use proxy::http::HasH2Reason; + + /// Registers a stack to receive taps. + pub trait Register { + type Tap: Tap; + type Taps: Stream; + + fn register(&mut self) -> Self::Taps; + } + + /// Advertises a Tap from a server to stacks. + pub trait Subscribe { + type Future: Future; + + /// Returns a `Future` that succeeds when the tap has been registered. + /// + /// If the tap cannot be registered, a `NoCapacity` error is returned. + fn subscribe(&mut self, tap: T) -> Self::Future; } /// - pub(super) fn inspect(&mut self, ev: &Event) { - if self.by_id.is_empty() { - return; - } - debug!( - "inspect taps={:?} event={:?}", - self.by_id.keys().collect::>(), - ev - ); - - // Iterate through taps by index so that items may be removed. - let mut idx = 0; - while idx < self.by_id.len() { - let (tap_id, inspect) = { - let (id, tap) = self.by_id.get_index(idx).unwrap(); - (*id, tap.inspect(ev)) - }; - - // If the tap is no longer receiving events, remove it. The index is only - // incremented on successs so that, when an item is removed, the swapped item - // is inspected on the next iteration OR, if the last item has been removed, - // `len()` will return `idx` and a subsequent iteration will not occur. - match inspect { - Ok(matched) => { - debug!("inspect tap={} match={}", tap_id, matched); - } - Err(Ended) => { - debug!("ended tap={}", tap_id); - self.by_id.swap_remove_index(idx); - continue; - } - } - - idx += 1; - } + pub trait Tap: Clone { + type TapRequest: TapRequest< + TapPayload = Self::TapRequestPayload, + TapResponse = Self::TapResponse, + TapResponsePayload = Self::TapResponsePayload, + >; + type TapRequestPayload: TapPayload; + type TapResponse: TapResponse; + type TapResponsePayload: TapPayload; + type Future: Future, Error = Never>; + + /// Returns `true` as l + fn can_tap_more(&self) -> bool; + + /// Determines whether a request should be tapped. + fn should_tap( + &self, + req: &http::Request, + inspect: &I, + ) -> bool; + + /// Initiate a tap. + /// + /// If the tap cannot be initialized, for instance because the tap has + /// completed or been canceled, then `None` is returned. + fn tap(&mut self) -> Self::Future; } -} -impl Tap { - pub fn new( - match_: &observe_request::Match, - capacity: usize, - ) -> Result<(Tap, futures_mpsc_lossy::Receiver), InvalidMatch> { - let (tx, rx) = futures_mpsc_lossy::channel(capacity); - let match_ = Match::new(match_)?; - let tap = Tap { match_, tx }; - Ok((tap, rx)) + pub trait TapRequest { + type TapPayload: TapPayload; + type TapResponse: TapResponse; + type TapResponsePayload: TapPayload; + + /// Start tapping a request, obtaining handles to tap its body and response. + fn open( + self, + req: &http::Request, + inspect: &I, + ) -> (Self::TapPayload, Self::TapResponse); } - fn inspect(&self, ev: &Event) -> Result { - if self.match_.matches(ev) { - return self - .tx - .lossy_send(ev.clone()) - .map_err(|_| Ended) - .map(|_| true); - } + pub trait TapPayload { + fn data(&mut self, data: &B); + + fn eos(self, headers: Option<&http::HeaderMap>); - Ok(false) + fn fail(self, error: &E); } -} -impl NextId { - fn next_id(&self) -> usize { - self.0.fetch_add(1, Ordering::Relaxed) + pub trait TapResponse { + type TapPayload: TapPayload; + + /// Record a response and obtain a handle to tap its body. + fn tap(self, rsp: &http::Response) -> Self::TapPayload; + + /// Record a service failure. + fn fail(self, error: &E); } + + #[derive(Debug)] + pub struct NoCapacity; + + impl ::std::fmt::Display for NoCapacity { + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + write!(f, "capacity exhausted") + } + } + + impl ::std::error::Error for NoCapacity {} + } diff --git a/src/tap/service.rs b/src/tap/service.rs index 1e6753a0e10f6..568b2eba35a06 100644 --- a/src/tap/service.rs +++ b/src/tap/service.rs @@ -1,505 +1,362 @@ -use bytes::{Buf, IntoBuf}; -use futures::{Async, Future, Poll}; +use bytes::IntoBuf; +use futures::{future, Async, Future, Poll, Stream}; use http; -use h2; -use hyper::body::Payload; -use std::marker::PhantomData; -use std::sync::{Arc, Mutex}; -use std::time::Instant; -use tokio_timer::clock; - -use super::{event, NextId, Taps}; -use proxy::{ - self, - http::{h1, HasH2Reason}, -}; +use hyper::body::Payload as HyperPayload; + +use super::iface::{Register, Tap, TapPayload, TapRequest, TapResponse}; +use super::Inspect; +use proxy::http::HasH2Reason; use svc; /// A stack module that wraps services to record taps. #[derive(Clone, Debug)] -pub struct Layer { - next_id: NextId, - taps: Arc>, - _p: PhantomData (T, M)>, +pub struct Layer { + registry: R, } /// Wraps services to record taps. #[derive(Clone, Debug)] -pub struct Stack -where - N: svc::Stack, -{ - next_id: NextId, - taps: Arc>, - inner: N, - _p: PhantomData (T)>, +pub struct Stack { + registry: R, + inner: T, } /// A middleware that records HTTP taps. #[derive(Clone, Debug)] -pub struct Service { - endpoint: event::Endpoint, - next_id: NextId, - taps: Arc>, +pub struct Service { + tap_rx: R, + taps: Vec, inner: S, + inspect: I, } -#[derive(Debug, Clone)] -pub struct ResponseFuture { - inner: F, - meta: Option, - taps: Option>>, - request_open_at: Instant, +/// Fetches `TapRequest`s, instruments messages to be tapped, and executes a +/// request. +pub struct ResponseFuture +where + I: Inspect, + T: Tap, + A: HyperPayload, + A::Error: HasH2Reason, + S: svc::Service>>, +{ + state: FutState, } -#[derive(Debug)] -pub struct RequestBody { - inner: B, - meta: Option, - taps: Option>>, - request_open_at: Instant, - byte_count: usize, - frame_count: usize, +enum FutState +where + I: Inspect, + T: Tap, + A: HyperPayload, + A::Error: HasH2Reason, + S: svc::Service>>, +{ + Taps { + taps: future::JoinAll>, + inspect: I, + request: Option>, + service: S, + }, + Call { + taps: Vec, + call: S::Future, + }, } +// A `Payload` instrumented with taps. #[derive(Debug)] -pub struct ResponseBody { +pub struct Payload +where + B: HyperPayload, + B::Error: HasH2Reason, + T: TapPayload, +{ inner: B, - meta: Option, - taps: Option>>, - request_open_at: Instant, - response_open_at: Instant, - response_first_frame_at: Option, - byte_count: usize, - frame_count: usize, + taps: Vec, } // === Layer === -pub fn layer(next_id: NextId, taps: Arc>) -> Layer +impl Layer where - T: Clone + Into, - M: svc::Stack, - M::Value: svc::Service>, Response = http::Response>, - >>>::Error: HasH2Reason, - A: Payload, - B: Payload, + R: Register + Clone, { - Layer { - next_id, - taps, - _p: PhantomData, + pub(super) fn new(registry: R) -> Self { + Self { registry } } } -impl svc::Layer for Layer +impl svc::Layer for Layer where - T: Clone + Into, + T: Inspect + Clone, + R: Register + Clone, M: svc::Stack, { - type Value = as svc::Stack>::Value; + type Value = as svc::Stack>::Value; type Error = M::Error; - type Stack = Stack; + type Stack = Stack; fn bind(&self, inner: M) -> Self::Stack { Stack { - next_id: self.next_id.clone(), - taps: self.taps.clone(), inner, - _p: PhantomData, + registry: self.registry.clone(), } } } // === Stack === -impl svc::Stack for Stack +impl svc::Stack for Stack where - T: Clone + Into, + T: Inspect + Clone, + R: Register + Clone, M: svc::Stack, { - type Value = Service; + type Value = Service; type Error = M::Error; fn make(&self, target: &T) -> Result { let inner = self.inner.make(&target)?; + let tap_rx = self.registry.clone().register(); Ok(Service { - next_id: self.next_id.clone(), - endpoint: target.clone().into(), - taps: self.taps.clone(), inner, + tap_rx, + taps: Vec::default(), + inspect: target.clone(), }) } } // === Service === -impl svc::Service> for Service +impl svc::Service> for Service where - S: svc::Service< - http::Request>, - Response = http::Response, - >, + I: Inspect + Clone, + R: Stream, + T: Tap, + T::TapRequestPayload: Send + 'static, + T::TapResponsePayload: Send + 'static, + S: svc::Service>, Response = http::Response> + + Clone, S::Error: HasH2Reason, - A: Payload, - B: Payload, + A: HyperPayload, + A::Error: HasH2Reason, + B: HyperPayload, + B::Error: HasH2Reason, { - type Response = http::Response>; + type Response = http::Response>; type Error = S::Error; - type Future = ResponseFuture; + type Future = ResponseFuture; fn poll_ready(&mut self) -> Poll<(), Self::Error> { + // Load new taps from the tap server. + while let Ok(Async::Ready(Some(t))) = self.tap_rx.poll() { + self.taps.push(t); + } + // Drop taps that have been canceled or completed. + self.taps.retain(|t| t.can_tap_more()); + self.inner.poll_ready() } fn call(&mut self, req: http::Request) -> Self::Future { - let request_open_at = clock::now(); - - // Only tap a request iff a `Source` is known. - let meta = req.extensions().get::().map(|source| { - let scheme = req.uri().scheme_part().cloned(); - let authority = req - .uri() - .authority_part() - .cloned() - .or_else(|| h1::authority_from_host(&req)); - let path = req.uri().path().into(); - - event::Request { - id: self.next_id.next_id(), - endpoint: self.endpoint.clone(), - source: source.clone(), - method: req.method().clone(), - scheme, - authority, - path, + // Determine which active taps match the request and collect all of the + // futures requesting TapRequests from the tap server. + let mut tap_futs = Vec::new(); + for t in self.taps.iter_mut() { + if t.should_tap(&req, &self.inspect) { + tap_futs.push(t.tap()); } - }); - - let (head, inner) = req.into_parts(); - let mut body = RequestBody { - inner, - meta: meta.clone(), - taps: Some(self.taps.clone()), - request_open_at, - byte_count: 0, - frame_count: 0, - }; - - body.tap_open(); - if body.is_end_stream() { - body.tap_eos(Some(&head.headers)); } - let req = http::Request::from_parts(head, body); ResponseFuture { - inner: self.inner.call(req), - meta, - taps: Some(self.taps.clone()), - request_open_at, + state: FutState::Taps { + taps: future::join_all(tap_futs), + request: Some(req), + service: self.inner.clone(), + inspect: self.inspect.clone(), + }, } } } -impl Future for ResponseFuture +impl Future for ResponseFuture where - B: Payload, - F: Future>, - F::Error: HasH2Reason, + I: Inspect, + T: Tap, + T::TapRequestPayload: Send + 'static, + T::TapResponsePayload: Send + 'static, + S: svc::Service>, Response = http::Response>, + S::Error: HasH2Reason, + A: HyperPayload, + A::Error: HasH2Reason, + B: HyperPayload, + B::Error: HasH2Reason, { - type Item = http::Response>; - type Error = F::Error; + type Item = http::Response>; + type Error = S::Error; fn poll(&mut self) -> Poll { - let rsp = try_ready!(self.inner.poll().map_err(|e| self.tap_err(e))); - let response_open_at = clock::now(); - - let meta = self.meta.take().map(|request| event::Response { - request, - status: rsp.status(), - }); - - let (head, inner) = rsp.into_parts(); - let mut body = ResponseBody { - inner, - meta, - taps: self.taps.take(), - request_open_at: self.request_open_at, - response_open_at, - response_first_frame_at: None, - byte_count: 0, - frame_count: 0, - }; - - body.tap_open(); - if body.is_end_stream() { - trace!("ResponseFuture::poll: eos"); - body.tap_eos(Some(&head.headers)); - } - - let rsp = http::Response::from_parts(head, body); - Ok(rsp.into()) - } -} - -impl ResponseFuture -where - B: Payload, - F: Future>, - F::Error: HasH2Reason, -{ - fn tap_err(&mut self, e: F::Error) -> F::Error { - if let Some(request) = self.meta.take() { - let meta = event::Response { - request, - status: http::StatusCode::INTERNAL_SERVER_ERROR, - }; - - if let Some(t) = self.taps.take() { - let now = clock::now(); - if let Ok(mut taps) = t.lock() { - taps.inspect(&event::Event::StreamResponseFail( - meta, - event::StreamResponseFail { - request_open_at: self.request_open_at, - response_open_at: now, - response_first_frame_at: None, - response_fail_at: now, - error: e.h2_reason().unwrap_or(h2::Reason::INTERNAL_ERROR), - bytes_sent: 0, - }, - )); + // Drive the state machine from FutState::Taps to FutState::Call to Ready. + loop { + self.state = match self.state { + FutState::Taps { + ref mut request, + ref mut service, + ref mut taps, + ref inspect, + } => { + // Get all the tap requests. If there's any sort of error, + // continue without taps. + let mut taps = match taps.poll() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(taps)) => taps, + Err(_) => Vec::new(), + }; + + let req = request.take().expect("request must be set"); + + // Record the request and obtain request-body and response taps. + let mut req_taps = Vec::with_capacity(taps.len()); + let mut rsp_taps = Vec::with_capacity(taps.len()); + for tap in taps.drain(..).filter_map(|t| t) { + let (req_tap, rsp_tap) = tap.open(&req, inspect); + req_taps.push(req_tap); + rsp_taps.push(rsp_tap); + } + + // Install the request taps into the request body. + let req = { + let (head, inner) = req.into_parts(); + let body = Payload { + inner, + taps: req_taps, + }; + http::Request::from_parts(head, body) + }; + + // Call the service with the decorated request and save the + // response taps for when the call completes. + let call = service.call(req); + FutState::Call { + call, + taps: rsp_taps, + } } - } - } - - e - } -} - -// === RequestBody === - -impl> Payload for RequestBody { - type Data = B::Data; - type Error = B::Error; - - fn is_end_stream(&self) -> bool { - self.inner.is_end_stream() - } - - fn poll_data(&mut self) -> Poll, Self::Error> { - let poll_frame = self.inner.poll_data().map_err(|e| self.tap_err(e)); - let frame = try_ready!(poll_frame).map(|f| f.into_buf()); - - if self.meta.is_some() { - if let Some(ref f) = frame { - self.frame_count += 1; - self.byte_count += f.remaining(); - } - } - - if self.inner.is_end_stream() { - self.tap_eos(None); - } - - Ok(Async::Ready(frame)) - } - - fn poll_trailers(&mut self) -> Poll, Self::Error> { - let trailers = try_ready!(self.inner.poll_trailers().map_err(|e| self.tap_err(e))); - self.tap_eos(trailers.as_ref()); - Ok(Async::Ready(trailers)) - } -} - -impl RequestBody { - fn tap_open(&mut self) { - if let Some(meta) = self.meta.as_ref() { - if let Some(taps) = self.taps.as_ref() { - if let Ok(mut taps) = taps.lock() { - taps.inspect(&event::Event::StreamRequestOpen(meta.clone())); - } - } - } - } - - fn tap_eos(&mut self, _: Option<&http::HeaderMap>) { - if let Some(meta) = self.meta.take() { - if let Some(t) = self.taps.take() { - let now = clock::now(); - if let Ok(mut taps) = t.lock() { - taps.inspect(&event::Event::StreamRequestEnd( - meta, - event::StreamRequestEnd { - request_open_at: self.request_open_at, - request_end_at: now, - }, - )); + FutState::Call { + ref mut call, + ref mut taps, + } => { + return match call.poll() { + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(rsp)) => { + // Tap the response headers and use the response + // body taps to decorate the response body. + let taps = taps.drain(..).map(|t| t.tap(&rsp)).collect(); + let (head, inner) = rsp.into_parts(); + let mut body = Payload { inner, taps }; + if body.is_end_stream() { + body.eos(None); + } + Ok(Async::Ready(http::Response::from_parts(head, body))) + } + Err(e) => { + for tap in taps.drain(..) { + tap.fail(&e); + } + Err(e) + } + }; } - } - } - } - - fn tap_err(&mut self, e: h2::Error) -> h2::Error { - if let Some(meta) = self.meta.take() { - if let Some(t) = self.taps.take() { - let now = clock::now(); - if let Ok(mut taps) = t.lock() { - taps.inspect(&event::Event::StreamRequestFail( - meta, - event::StreamRequestFail { - request_open_at: self.request_open_at, - request_fail_at: now, - error: e.reason().unwrap_or(h2::Reason::INTERNAL_ERROR), - }, - )); - } - } + }; } - - e - } -} - -impl Drop for RequestBody { - fn drop(&mut self) { - // TODO this should be recorded as a cancelation if the stream didn't end. - self.tap_eos(None); } } -// === ResponseBody === +// === Payload === -impl Default for ResponseBody { +// `T` need not implement Default. +impl Default for Payload +where + B: HyperPayload + Default, + B::Error: HasH2Reason, + T: TapPayload, +{ fn default() -> Self { - let now = clock::now(); Self { inner: B::default(), - meta: None, - taps: None, - request_open_at: now, - response_open_at: now, - response_first_frame_at: None, - byte_count: 0, - frame_count: 0, + taps: Vec::default(), } } } -impl> Payload for ResponseBody { - type Data = B::Data; +impl HyperPayload for Payload +where + B: HyperPayload, + B::Error: HasH2Reason, + T: TapPayload + Send + 'static, +{ + type Data = ::Buf; type Error = B::Error; fn is_end_stream(&self) -> bool { self.inner.is_end_stream() } - fn poll_data(&mut self) -> Poll, Self::Error> { - trace!("ResponseBody::poll_data"); - let poll_frame = self.inner.poll_data().map_err(|e| self.tap_err(e)); + fn poll_data(&mut self) -> Poll, B::Error> { + let poll_frame = self.inner.poll_data().map_err(|e| self.err(e)); let frame = try_ready!(poll_frame).map(|f| f.into_buf()); - - if self.meta.is_some() { - if self.response_first_frame_at.is_none() { - self.response_first_frame_at = Some(clock::now()); - } - if let Some(ref f) = frame { - self.frame_count += 1; - self.byte_count += f.remaining(); - } - } - - if self.inner.is_end_stream() { - self.tap_eos(None); - } - + self.data(frame.as_ref()); Ok(Async::Ready(frame)) } - fn poll_trailers(&mut self) -> Poll, Self::Error> { - trace!("ResponseBody::poll_trailers"); - let trailers = try_ready!(self.inner.poll_trailers().map_err(|e| self.tap_err(e))); - self.tap_eos(trailers.as_ref()); + fn poll_trailers(&mut self) -> Poll, B::Error> { + let trailers = try_ready!(self.inner.poll_trailers().map_err(|e| self.err(e))); + self.eos(trailers.as_ref()); Ok(Async::Ready(trailers)) } } -impl ResponseBody { - fn tap_open(&mut self) { - if let Some(meta) = self.meta.as_ref() { - if let Some(taps) = self.taps.as_ref() { - if let Ok(mut taps) = taps.lock() { - taps.inspect(&event::Event::StreamResponseOpen( - meta.clone(), - event::StreamResponseOpen { - request_open_at: self.request_open_at, - response_open_at: clock::now(), - }, - )); - } +impl Payload +where + B: HyperPayload, + B::Error: HasH2Reason, + T: TapPayload, +{ + fn data(&mut self, frame: Option<&::Buf>) { + if let Some(ref f) = frame { + for ref mut tap in self.taps.iter_mut() { + tap.data::<::Buf>(f); } } - } - fn tap_eos(&mut self, trailers: Option<&http::HeaderMap>) { - trace!("ResponseBody::tap_eos: trailers={}", trailers.is_some()); - if let Some(meta) = self.meta.take() { - if let Some(t) = self.taps.take() { - let response_end_at = clock::now(); - if let Ok(mut taps) = t.lock() { - taps.inspect(&event::Event::StreamResponseEnd( - meta, - event::StreamResponseEnd { - request_open_at: self.request_open_at, - response_open_at: self.response_open_at, - response_first_frame_at: self - .response_first_frame_at - .unwrap_or(response_end_at), - response_end_at, - grpc_status: trailers.and_then(Self::grpc_status), - bytes_sent: self.byte_count as u64, - }, - )); - } - } + if self.inner.is_end_stream() { + self.eos(None); } } - fn grpc_status(t: &http::HeaderMap) -> Option { - t.get("grpc-status") - .and_then(|v| v.to_str().ok()) - .and_then(|s| s.parse::().ok()) + fn eos(&mut self, trailers: Option<&http::HeaderMap>) { + for tap in self.taps.drain(..) { + tap.eos(trailers); + } } - fn tap_err(&mut self, e: h2::Error) -> h2::Error { - trace!("ResponseBody::tap_err {:?}", e); - - if let Some(meta) = self.meta.take() { - if let Some(t) = self.taps.take() { - if let Ok(mut taps) = t.lock() { - taps.inspect(&event::Event::StreamResponseFail( - meta, - event::StreamResponseFail { - request_open_at: self.request_open_at, - response_open_at: self.response_open_at, - response_first_frame_at: self.response_first_frame_at, - response_fail_at: clock::now(), - error: e.reason().unwrap_or(h2::Reason::INTERNAL_ERROR), - bytes_sent: self.byte_count as u64, - }, - )); - } - } + fn err(&mut self, error: B::Error) -> B::Error { + for tap in self.taps.drain(..) { + tap.fail(&error); } - e + error } } -impl Drop for ResponseBody { +impl Drop for Payload +where + B: HyperPayload, + B::Error: HasH2Reason, + T: TapPayload, +{ fn drop(&mut self) { - trace!("ResponseHandle::drop"); - // TODO this should be recorded as a cancelation if the stream didn't end. - self.tap_eos(None); + self.eos(None); } }