diff --git a/lib/stack/src/stack_per_request.rs b/lib/stack/src/stack_per_request.rs index 46adb66551326..a3b5e27bdb905 100644 --- a/lib/stack/src/stack_per_request.rs +++ b/lib/stack/src/stack_per_request.rs @@ -1,5 +1,3 @@ -#![allow(dead_code)] - use futures::Poll; use std::fmt; diff --git a/src/app/inbound.rs b/src/app/inbound.rs index 86aeab9c7f80b..338f72682fc0f 100644 --- a/src/app/inbound.rs +++ b/src/app/inbound.rs @@ -1,4 +1,5 @@ use http; +use indexmap::IndexMap; use std::fmt; use std::net::SocketAddr; @@ -33,10 +34,6 @@ impl classify::CanClassify for Endpoint { } impl Endpoint { - pub fn dst_name(&self) -> Option<&NameAddr> { - self.dst_name.as_ref() - } - fn target(&self) -> connect::Target { let tls = Conditional::None(tls::ReasonForNoTls::InternalTraffic); connect::Target::new(self.addr, tls) @@ -49,13 +46,32 @@ impl settings::router::HasConnect for Endpoint { } } -impl From for tap::Endpoint { - fn from(ep: Endpoint) -> Self { - tap::Endpoint { - direction: tap::Direction::In, - target: ep.target(), - labels: Default::default(), - } +impl tap::Inspect for Endpoint { + fn src_addr(&self, req: &http::Request) -> Option { + req.extensions().get::().map(|s| s.remote) + } + + fn src_tls(&self, req: &http::Request) -> tls::Status { + req.extensions() + .get::() + .map(|s| s.tls_status) + .unwrap_or_else(|| Conditional::None(tls::ReasonForNoTls::Disabled)) + } + + fn dst_addr(&self, _: &http::Request) -> Option { + Some(self.addr) + } + + fn dst_labels(&self, _: &http::Request) -> Option<&IndexMap> { + None + } + + fn dst_tls(&self, _: &http::Request) -> tls::Status { + Conditional::None(tls::ReasonForNoTls::InternalTraffic) + } + + fn is_outbound(&self, _: &http::Request) -> bool { + false } } @@ -103,10 +119,10 @@ impl router::Recognize> for RecognizeEndpoint { } pub mod orig_proto_downgrade { - use std::marker::PhantomData; use http; use proxy::http::orig_proto; use proxy::server::Source; + use std::marker::PhantomData; use svc; #[derive(Debug)] @@ -168,10 +184,7 @@ pub mod orig_proto_downgrade { fn make(&self, target: &Source) -> Result { debug!("downgrading requests; source={:?}", target); - self - .inner - .make(&target) - .map(orig_proto::Downgrade::new) + self.inner.make(&target).map(orig_proto::Downgrade::new) } } } diff --git a/src/app/main.rs b/src/app/main.rs index c43779cfed0a2..0f1dc4534d2fd 100644 --- a/src/app/main.rs +++ b/src/app/main.rs @@ -193,8 +193,7 @@ where panic!("invalid DNS configuration: {:?}", e); }); - let tap_next_id = tap::NextId::default(); - let (taps, observe) = control::Observe::new(100); + let (tap_layer, tap_grpc, tap_daemon) = tap::new(); let (ctl_http_metrics, ctl_http_report) = { let (m, r) = http_metrics::new::(config.metrics_retain_idle); @@ -329,7 +328,7 @@ where .push(buffer::layer()) .push(settings::router::layer::()) .push(orig_proto_upgrade::layer()) - .push(tap::layer(tap_next_id.clone(), taps.clone())) + .push(tap_layer.clone()) .push(metrics::layer::<_, classify::Response>( endpoint_http_metrics, )) @@ -478,7 +477,8 @@ where let endpoint_router = client_stack .push(buffer::layer()) .push(settings::router::layer::()) - .push(tap::layer(tap_next_id, taps)) + .push(phantom_data::layer()) + .push(tap_layer) .push(http_metrics::layer::<_, classify::Response>( endpoint_http_metrics, )) @@ -593,19 +593,19 @@ where let mut rt = current_thread::Runtime::new().expect("initialize admin thread runtime"); - let tap = serve_tap(control_listener, TapServer::new(observe)); - let metrics = control::serve_http( "metrics", metrics_listener, metrics::Serve::new(report), ); - // tap is already wrapped in a logging Future. - rt.spawn(tap); - // metrics_server is already wrapped in a logging Future. + rt.spawn(tap_daemon.map_err(|_| ())); + rt.spawn(serve_tap(control_listener, TapServer::new(tap_grpc))); + rt.spawn(metrics); + rt.spawn(::logging::admin().bg("dns-resolver").future(dns_bg)); + rt.spawn( ::logging::admin() .bg("resolver") diff --git a/src/app/outbound.rs b/src/app/outbound.rs index fb33fbdbbe0d4..b42ecc1b2e996 100644 --- a/src/app/outbound.rs +++ b/src/app/outbound.rs @@ -1,11 +1,12 @@ -use std::fmt; +use indexmap::IndexMap; +use std::{fmt, net}; use control::destination::{Metadata, ProtocolHint}; use proxy::http::settings; use svc; use tap; use transport::{connect, tls}; -use NameAddr; +use {Conditional, NameAddr}; #[derive(Clone, Debug)] pub struct Endpoint { @@ -52,17 +53,36 @@ impl svc::watch::WithUpdate for Endpoint { } } -impl From for tap::Endpoint { - fn from(ep: Endpoint) -> Self { - // TODO add route labels... - tap::Endpoint { - direction: tap::Direction::Out, - labels: ep.metadata.labels().clone(), - target: ep.connect.clone(), - } +impl tap::Inspect for Endpoint { + + fn src_addr(&self, req: &http::Request) -> Option { + use proxy::server::Source; + + req.extensions().get::().map(|s| s.remote) + } + + fn src_tls(&self, _: &http::Request) -> tls::Status { + Conditional::None(tls::ReasonForNoTls::InternalTraffic) + } + + fn dst_addr(&self, _: &http::Request) -> Option { + Some(self.connect.addr) + } + + fn dst_labels(&self, _: &http::Request) -> Option<&IndexMap> { + Some(self.metadata.labels()) + } + + fn dst_tls(&self, _: &http::Request) -> tls::Status { + self.metadata.tls_status() + } + + fn is_outbound(&self, _: &http::Request) -> bool { + true } } + pub mod discovery { use futures::{Async, Poll}; use std::net::SocketAddr; diff --git a/src/control/destination/mod.rs b/src/control/destination/mod.rs index dad8506f5087e..51c75132beaa1 100644 --- a/src/control/destination/mod.rs +++ b/src/control/destination/mod.rs @@ -222,4 +222,8 @@ impl Metadata { pub fn tls_identity(&self) -> Conditional<&tls::Identity, tls::ReasonForNoIdentity> { self.tls_identity.as_ref() } + + pub fn tls_status(&self) -> tls::Status { + self.tls_identity().map(|_| ()) + } } diff --git a/src/control/mod.rs b/src/control/mod.rs index af0c8cec53fd5..7a3f32be234f0 100644 --- a/src/control/mod.rs +++ b/src/control/mod.rs @@ -1,9 +1,6 @@ mod cache; pub mod destination; -mod observe; -pub mod pb; mod remote_stream; mod serve_http; -pub use self::observe::Observe; pub use self::serve_http::serve_http; diff --git a/src/control/observe.rs b/src/control/observe.rs deleted file mode 100644 index e4d149978be48..0000000000000 --- a/src/control/observe.rs +++ /dev/null @@ -1,178 +0,0 @@ -use futures::{future, Poll, Stream}; -use futures_mpsc_lossy; -use http::HeaderMap; -use indexmap::IndexMap; -use std::sync::{Arc, Mutex}; -use std::sync::atomic::{AtomicUsize, Ordering}; -use tower_grpc::{self as grpc, Response}; - -use api::tap::{server, ObserveRequest, TapEvent}; -use convert::*; -use tap::{event, Event, Tap, Taps}; - -#[derive(Clone, Debug)] -pub struct Observe { - next_id: Arc, - taps: Arc>, - tap_capacity: usize, -} - -pub struct TapEvents { - rx: futures_mpsc_lossy::Receiver, - remaining: usize, - current: IndexMap, - tap_id: usize, - taps: Arc>, -} - -impl Observe { - pub fn new(tap_capacity: usize) -> (Arc>, Observe) { - let taps = Arc::new(Mutex::new(Taps::default())); - - let observe = Observe { - next_id: Arc::new(AtomicUsize::new(0)), - tap_capacity, - taps: taps.clone(), - }; - - (taps, observe) - } -} - -impl server::Tap for Observe { - type ObserveStream = TapEvents; - type ObserveFuture = future::FutureResult, grpc::Error>; - - fn observe(&mut self, req: grpc::Request) -> Self::ObserveFuture { - if self.next_id.load(Ordering::Acquire) == ::std::usize::MAX { - return future::err(grpc::Error::Grpc( - grpc::Status::with_code(grpc::Code::Internal), - HeaderMap::new(), - )); - } - - let req = req.into_inner(); - let (tap, rx) = match req.match_ - .and_then(|m| Tap::new(&m, self.tap_capacity).ok()) - { - Some(m) => m, - None => { - return future::err(grpc::Error::Grpc( - grpc::Status::with_code(grpc::Code::InvalidArgument), - HeaderMap::new(), - )); - } - }; - - let tap_id = match self.taps.lock() { - Ok(mut taps) => { - let tap_id = self.next_id.fetch_add(1, Ordering::AcqRel); - let _ = (*taps).insert(tap_id, tap); - tap_id - } - Err(_) => { - return future::err(grpc::Error::Grpc( - grpc::Status::with_code(grpc::Code::Internal), - HeaderMap::new(), - )); - } - }; - - let events = TapEvents { - rx, - tap_id, - current: IndexMap::default(), - remaining: req.limit as usize, - taps: self.taps.clone(), - }; - - future::ok(Response::new(events)) - } -} - -impl Stream for TapEvents { - type Item = TapEvent; - type Error = grpc::Error; - - fn poll(&mut self) -> Poll, Self::Error> { - loop { - if self.remaining == 0 && self.current.is_empty() { - trace!("tap completed"); - return Ok(None.into()); - } - - let poll: Poll, Self::Error> = - self.rx.poll().or_else(|_| Ok(None.into())); - - trace!("polling; remaining={}; current={}", self.remaining, { - use std::fmt::Write; - let mut s = String::new(); - write!(s, "[").unwrap(); - for id in self.current.keys() { - write!(s, "{},", *id).unwrap(); - } - write!(s, "]").unwrap(); - s - }); - match try_ready!(poll) { - Some(ev) => { - match ev { - Event::StreamRequestOpen(ref req) => { - if self.remaining == 0 { - trace!("exhausted; ignoring req={}", req.id); - continue; - } - trace!("insert req={}", req.id); - self.remaining -= 1; - let _ = self.current.insert(req.id, req.clone()); - } - Event::StreamRequestFail(ref req, _) => { - trace!("fail req={}", req.id); - if self.current.remove(&req.id).is_none() { - warn!("did not exist req={}", req.id); - continue; - } - } - Event::StreamResponseOpen(ref rsp, _) => { - trace!("response req={}", rsp.request.id); - if !self.current.contains_key(&rsp.request.id) { - warn!("did not exist req={}", rsp.request.id); - continue; - } - } - Event::StreamResponseFail(ref rsp, _) | - Event::StreamResponseEnd(ref rsp, _) => { - trace!("end req={}", rsp.request.id); - if self.current.remove(&rsp.request.id).is_none() { - warn!("did not exist req={}", rsp.request.id); - continue; - } - } - ev => { - trace!("ignoring event: {:?}", ev); - continue - } - } - - trace!("emitting tap event: {:?}", ev); - if let Ok(te) = TapEvent::try_from(&ev) { - trace!("emitted tap event"); - // TODO Do limit checks here. - return Ok(Some(te).into()); - } - } - None => { - return Ok(None.into()); - } - } - } - } -} - -impl Drop for TapEvents { - fn drop(&mut self) { - if let Ok(mut taps) = self.taps.lock() { - let _ = (*taps).remove(self.tap_id); - } - } -} diff --git a/src/control/pb.rs b/src/control/pb.rs deleted file mode 100644 index ef674eebc508e..0000000000000 --- a/src/control/pb.rs +++ /dev/null @@ -1,210 +0,0 @@ -#![allow(dead_code)] -#![cfg_attr(feature = "cargo-clippy", allow(clippy))] - -use std::error::Error; -use std::fmt; - -use api::*; -use convert::*; -use proxy; -use tap::event::{self, Event}; - -#[derive(Debug, Clone)] -pub struct UnknownEvent; - -impl fmt::Display for UnknownEvent { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "unknown tap event") - } -} - -impl Error for UnknownEvent { - #[inline] - fn description(&self) -> &str { - "unknown tap event" - } -} - -impl event::StreamResponseEnd { - fn to_tap_event(&self, ctx: &event::Request) -> tap::TapEvent { - let eos = self.grpc_status - .map(tap::Eos::from_grpc_status) - ; - - let end = tap::tap_event::http::ResponseEnd { - id: Some(tap::tap_event::http::StreamId { - base: 0, // TODO FIXME - stream: ctx.id as u64, - }), - since_request_init: Some(pb_elapsed(self.request_open_at, self.response_end_at)), - since_response_init: Some(pb_elapsed(self.response_open_at, self.response_end_at)), - response_bytes: self.bytes_sent, - eos, - }; - - tap::TapEvent { - proxy_direction: ctx.endpoint.direction.as_pb().into(), - source: Some((&ctx.source.remote).into()), - source_meta: Some(ctx.source.src_meta()), - destination: Some((&ctx.endpoint.target.addr).into()), - destination_meta: Some(ctx.endpoint.dst_meta()), - event: Some(tap::tap_event::Event::Http(tap::tap_event::Http { - event: Some(tap::tap_event::http::Event::ResponseEnd(end)), - })), - } - } -} - -impl event::StreamResponseFail { - fn to_tap_event(&self, ctx: &event::Request) -> tap::TapEvent { - let end = tap::tap_event::http::ResponseEnd { - id: Some(tap::tap_event::http::StreamId { - base: 0, // TODO FIXME - stream: ctx.id as u64, - }), - since_request_init: Some(pb_elapsed(self.request_open_at, self.response_fail_at)), - since_response_init: Some(pb_elapsed(self.response_open_at, self.response_fail_at)), - response_bytes: self.bytes_sent, - eos: Some(self.error.into()), - }; - - tap::TapEvent { - proxy_direction: ctx.endpoint.direction.as_pb().into(), - source: Some((&ctx.source.remote).into()), - source_meta: Some(ctx.source.src_meta()), - destination: Some((&ctx.endpoint.target.addr).into()), - destination_meta: Some(ctx.endpoint.dst_meta()), - event: Some(tap::tap_event::Event::Http(tap::tap_event::Http { - event: Some(tap::tap_event::http::Event::ResponseEnd(end)), - })), - } - } -} - -impl event::StreamRequestFail { - fn to_tap_event(&self, ctx: &event::Request) -> tap::TapEvent { - let end = tap::tap_event::http::ResponseEnd { - id: Some(tap::tap_event::http::StreamId { - base: 0, // TODO FIXME - stream: ctx.id as u64, - }), - since_request_init: Some(pb_elapsed(self.request_open_at, self.request_fail_at)), - since_response_init: None, - response_bytes: 0, - eos: Some(self.error.into()), - }; - - tap::TapEvent { - proxy_direction: ctx.endpoint.direction.as_pb().into(), - source: Some((&ctx.source.remote).into()), - source_meta: Some(ctx.source.src_meta()), - destination: Some((&ctx.endpoint.target.addr).into()), - destination_meta: Some(ctx.endpoint.dst_meta()), - event: Some(tap::tap_event::Event::Http(tap::tap_event::Http { - event: Some(tap::tap_event::http::Event::ResponseEnd(end)), - })), - } - } -} - -impl<'a> TryFrom<&'a Event> for tap::TapEvent { - type Err = UnknownEvent; - fn try_from(ev: &'a Event) -> Result { - let tap_ev = match *ev { - Event::StreamRequestOpen(ref ctx) => { - let init = tap::tap_event::http::RequestInit { - id: Some(tap::tap_event::http::StreamId { - base: 0, - // TODO FIXME - stream: ctx.id as u64, - }), - method: Some((&ctx.method).into()), - scheme: ctx.scheme.as_ref().map(http_types::Scheme::from), - authority: ctx.authority.as_ref() - .map(|a| a.as_str()) - .unwrap_or_default() - .into(), - path: ctx.path.clone(), - }; - - tap::TapEvent { - proxy_direction: ctx.endpoint.direction.as_pb().into(), - source: Some((&ctx.source.remote).into()), - source_meta: Some(ctx.source.src_meta()), - destination: Some((&ctx.endpoint.target.addr).into()), - destination_meta: Some(ctx.endpoint.dst_meta()), - event: Some(tap::tap_event::Event::Http(tap::tap_event::Http { - event: Some(tap::tap_event::http::Event::RequestInit(init)), - })), - } - } - - Event::StreamResponseOpen(ref ctx, ref rsp) => { - let init = tap::tap_event::http::ResponseInit { - id: Some(tap::tap_event::http::StreamId { - base: 0, - // TODO FIXME - stream: ctx.request.id as u64, - }), - since_request_init: Some(pb_elapsed(rsp.request_open_at, rsp.response_open_at)), - http_status: u32::from(ctx.status.as_u16()), - }; - - tap::TapEvent { - proxy_direction: ctx.request.endpoint.direction.as_pb().into(), - source: Some((&ctx.request.source.remote).into()), - source_meta: Some(ctx.request.source.src_meta()), - destination: Some((&ctx.request.endpoint.target.addr).into()), - destination_meta: Some(ctx.request.endpoint.dst_meta()), - event: Some(tap::tap_event::Event::Http(tap::tap_event::Http { - event: Some(tap::tap_event::http::Event::ResponseInit(init)), - })), - } - } - - Event::StreamRequestFail(ref ctx, ref fail) => { - fail.to_tap_event(&ctx) - } - - Event::StreamResponseEnd(ref ctx, ref end) => { - end.to_tap_event(&ctx.request) - } - - Event::StreamResponseFail(ref ctx, ref fail) => { - fail.to_tap_event(&ctx.request) - } - - _ => return Err(UnknownEvent), - }; - - Ok(tap_ev) - } -} - -impl proxy::Source { - fn src_meta(&self) -> tap::tap_event::EndpointMeta { - let mut meta = tap::tap_event::EndpointMeta::default(); - - meta.labels.insert("tls".to_owned(), format!("{}", self.tls_status)); - - meta - } -} - -impl event::Direction { - fn as_pb(&self) -> tap::tap_event::ProxyDirection { - match self { - event::Direction::Out => tap::tap_event::ProxyDirection::Outbound, - event::Direction::In => tap::tap_event::ProxyDirection::Inbound, - } - } -} - -impl event::Endpoint { - fn dst_meta(&self) -> tap::tap_event::EndpointMeta { - let mut meta = tap::tap_event::EndpointMeta::default(); - meta.labels.extend(self.labels.clone()); - meta.labels.insert("tls".to_owned(), format!("{}", self.target.tls_status())); - meta - } -} diff --git a/src/proxy/http/mod.rs b/src/proxy/http/mod.rs index 6ccd936b7ceab..8d9704ee0132d 100644 --- a/src/proxy/http/mod.rs +++ b/src/proxy/http/mod.rs @@ -23,6 +23,12 @@ pub trait HasH2Reason { fn h2_reason(&self) -> Option<::h2::Reason>; } +impl HasH2Reason for ::h2::Error { + fn h2_reason(&self) -> Option<::h2::Reason> { + self.reason() + } +} + impl HasH2Reason for super::buffer::ServiceError { fn h2_reason(&self) -> Option<::h2::Reason> { match self { diff --git a/src/proxy/http/profiles.rs b/src/proxy/http/profiles.rs index 0bddc6e3c79de..a682934effb91 100644 --- a/src/proxy/http/profiles.rs +++ b/src/proxy/http/profiles.rs @@ -1,5 +1,3 @@ -#![allow(dead_code)] - extern crate tower_discover; use futures::Stream; diff --git a/src/proxy/http/settings.rs b/src/proxy/http/settings.rs index e3ac7775e65ff..3161131237091 100644 --- a/src/proxy/http/settings.rs +++ b/src/proxy/http/settings.rs @@ -191,6 +191,16 @@ pub mod router { } } + impl Clone for Service + where + M: svc::Stack, + M::Value: svc::Service>, + { + fn clone(&self) -> Self { + Self { router: self.router.clone() } + } + } + impl svc::Service> for Service where M: svc::Stack, @@ -212,7 +222,8 @@ pub mod router { } fn call(&mut self, req: http::Request) -> Self::Future { - ResponseFuture { inner: self.router.call(req) } + let inner = self.router.call(req); + ResponseFuture { inner } } } diff --git a/src/tap/ctx.rs b/src/tap/ctx.rs deleted file mode 100644 index d034cb26838e4..0000000000000 --- a/src/tap/ctx.rs +++ /dev/null @@ -1,85 +0,0 @@ -use indexmap::IndexMap; -use http; -use std::sync::{Arc, atomic::AtomicUsize}; -use std::sync::atomic::Ordering; - -use ctx; - - -/// A `RequestId` can be mapped to a `u64`. No `RequestId`s will map to the -/// same value within a process. -/// -/// XXX `usize` is too small except on 64-bit platforms. TODO: Use `u64` when -/// `AtomicU64` becomes stable. -#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] -pub struct RequestId(usize); - -/// Describes a stream's request headers. -#[derive(Debug)] -pub struct Request { - // A numeric ID useful for debugging & correlation. - pub id: RequestId, - - pub uri: http::Uri, - pub method: http::Method, - - /// Identifies the proxy server that received the request. - pub server: Arc, - - /// Identifies the proxy client that dispatched the request. - pub client: Arc, -} - -/// Describes a stream's response headers. -#[derive(Debug)] -pub struct Response { - pub request: Arc, - - pub status: http::StatusCode, -} - -impl RequestId { - fn next() -> Self { - static NEXT_REQUEST_ID: AtomicUsize = AtomicUsize::new(0); - RequestId(NEXT_REQUEST_ID.fetch_add(1, Ordering::SeqCst)) - } -} - -impl Into for RequestId { - fn into(self) -> u64 { - self.0 as u64 - } -} - -impl Request { - pub fn new( - request: &http::Request, - server: &Arc, - client: &Arc, - ) -> Arc { - let r = Self { - id: RequestId::next(), - uri: request.uri().clone(), - method: request.method().clone(), - server: Arc::clone(server), - client: Arc::clone(client), - }; - - Arc::new(r) - } - - pub fn labels(&self) -> &IndexMap { - self.client.labels() - } -} - -impl Response { - pub fn new(response: &http::Response, request: &Arc) -> Arc { - let r = Self { - status: response.status(), - request: Arc::clone(request), - }; - - Arc::new(r) - } -} diff --git a/src/tap/daemon.rs b/src/tap/daemon.rs new file mode 100644 index 0000000000000..9acc8f2f6fe6e --- /dev/null +++ b/src/tap/daemon.rs @@ -0,0 +1,200 @@ +use futures::sync::{mpsc, oneshot}; +use futures::{Async, Future, Poll, Stream}; +use never::Never; + +use super::iface::Tap; + +pub fn new() -> (Daemon, Register, Subscribe) { + let (svc_tx, svc_rx) = mpsc::channel(super::REGISTER_CHANNEL_CAPACITY); + let (tap_tx, tap_rx) = mpsc::channel(super::TAP_CAPACITY); + + let daemon = Daemon { + svc_rx, + svcs: Vec::default(), + + tap_rx, + taps: Vec::default(), + }; + + (daemon, Register(svc_tx), Subscribe(tap_tx)) +} + +/// A background task that connects a tap server and proxy services. +/// +/// The daemon provides `Register` to allow proxy services to listen for new +/// taps; and it provides `Subscribe` to allow the tap server to advertise new +/// taps to proxy services. +#[must_use = "daemon must be polled"] +#[derive(Debug)] +pub struct Daemon { + svc_rx: mpsc::Receiver>, + svcs: Vec>, + + tap_rx: mpsc::Receiver<(T, oneshot::Sender<()>)>, + taps: Vec, +} + +#[derive(Debug)] +pub struct Register(mpsc::Sender>); + +#[derive(Debug)] +pub struct Subscribe(mpsc::Sender<(T, oneshot::Sender<()>)>); + +#[derive(Debug)] +pub struct SubscribeFuture(FutState); + +#[derive(Debug)] +enum FutState { + Subscribe { + tap: Option, + tap_tx: mpsc::Sender<(T, oneshot::Sender<()>)>, + }, + Pending(oneshot::Receiver<()>), +} + +impl Future for Daemon { + type Item = (); + type Error = Never; + + fn poll(&mut self) -> Poll<(), Never> { + // Drop taps that are no longer active (i.e. the response stream has + // been dropped). + let tap_count = self.taps.len(); + self.taps.retain(|t| t.can_tap_more()); + trace!("retained {} of {} taps", self.taps.len(), tap_count); + + // Drop services that are no longer active. + for idx in (0..self.svcs.len()).rev() { + // It's "okay" if a service isn't ready to receive taps. We just + // fall back to being lossy rather than dropping the service + // entirely. + if self.svcs[idx].poll_ready().is_err() { + trace!("removing a service"); + self.svcs.swap_remove(idx); + } + } + + // Connect newly-created services to active taps. + while let Ok(Async::Ready(Some(mut svc))) = self.svc_rx.poll() { + trace!("registering a service"); + + // Notify the service of all active taps. + let mut dropped = false; + for tap in &self.taps { + debug_assert!(!dropped); + + let err = svc.try_send(tap.clone()).err(); + + // If the service has been dropped, make sure that it's not added. + dropped = err.as_ref().map(|e| e.is_disconnected()).unwrap_or(false); + + // If service can't receive any more taps, stop trying. + if err.is_some() { + break; + } + } + + if !dropped { + self.svcs.push(svc); + trace!("service registered"); + } + } + + // Connect newly-created taps to existing services. + while let Ok(Async::Ready(Some((tap, ack)))) = self.tap_rx.poll() { + trace!("subscribing a tap"); + if self.taps.len() == super::TAP_CAPACITY { + warn!("tap capacity exceeded"); + drop(ack); + continue; + } + if !tap.can_tap_more() { + trace!("tap already dropped"); + drop(ack); + continue; + } + + // Notify services of the new tap. If the service has been dropped, + // it's removed from the registry. If it's full, it isn't notified + // of the tap. + for idx in (0..self.svcs.len()).rev() { + let err = self.svcs[idx].try_send(tap.clone()).err(); + if err.map(|e| e.is_disconnected()).unwrap_or(false) { + trace!("removing a service"); + self.svcs.swap_remove(idx); + } + } + + self.taps.push(tap); + let _ = ack.send(()); + trace!("tap subscribed"); + } + + Ok(Async::NotReady) + } +} + +impl Clone for Register { + fn clone(&self) -> Self { + Register(self.0.clone()) + } +} + +impl super::iface::Register for Register { + type Tap = T; + type Taps = mpsc::Receiver; + + fn register(&mut self) -> Self::Taps { + let (tx, rx) = mpsc::channel(super::TAP_CAPACITY); + if let Err(_) = self.0.try_send(tx) { + debug!("failed to register service"); + } + rx + } +} + +impl Clone for Subscribe { + fn clone(&self) -> Self { + Subscribe(self.0.clone()) + } +} + +impl super::iface::Subscribe for Subscribe { + type Future = SubscribeFuture; + + fn subscribe(&mut self, tap: T) -> Self::Future { + SubscribeFuture(FutState::Subscribe { + tap: Some(tap), + tap_tx: self.0.clone(), + }) + } +} + +impl Future for SubscribeFuture { + type Item = (); + type Error = super::iface::NoCapacity; + + fn poll(&mut self) -> Poll<(), Self::Error> { + loop { + self.0 = match self.0 { + FutState::Subscribe { + ref mut tap, + ref mut tap_tx, + } => { + try_ready!(tap_tx.poll_ready().map_err(|_| super::iface::NoCapacity)); + + let tap = tap.take().expect("tap must be set"); + let (tx, rx) = oneshot::channel(); + tap_tx + .try_send((tap, tx)) + .map_err(|_| super::iface::NoCapacity)?; + + FutState::Pending(rx) + } + FutState::Pending(ref mut rx) => { + return rx.poll().map_err(|_| super::iface::NoCapacity); + } + } + } + } +} diff --git a/src/tap/event.rs b/src/tap/event.rs deleted file mode 100644 index dbb9b28aa3b57..0000000000000 --- a/src/tap/event.rs +++ /dev/null @@ -1,85 +0,0 @@ -use h2; -use http; -use indexmap::IndexMap; -use std::time::Instant; - -use proxy::Source; -use transport::connect; - -// TODO this should be replaced with a string name. -#[derive(Copy, Clone, Debug)] -pub enum Direction { In, Out } - -#[derive(Clone, Debug)] -pub struct Endpoint { - pub direction: Direction, - pub target: connect::Target, - pub labels: IndexMap, -} - -#[derive(Clone, Debug)] -pub struct Request { - pub id: usize, - pub source: Source, - pub endpoint: Endpoint, - pub method: http::Method, - pub scheme: Option, - pub authority: Option, - pub path: String, -} - -#[derive(Clone, Debug)] -pub struct Response { - pub request: Request, - pub status: http::StatusCode, -} - -#[derive(Clone, Debug)] -pub enum Event { - StreamRequestOpen(Request), - StreamRequestFail(Request, StreamRequestFail), - StreamRequestEnd(Request, StreamRequestEnd), - - StreamResponseOpen(Response, StreamResponseOpen), - StreamResponseFail(Response, StreamResponseFail), - StreamResponseEnd(Response, StreamResponseEnd), -} - -#[derive(Clone, Debug)] -pub struct StreamRequestFail { - pub request_open_at: Instant, - pub request_fail_at: Instant, - pub error: h2::Reason, -} - -#[derive(Clone, Debug)] -pub struct StreamRequestEnd { - pub request_open_at: Instant, - pub request_end_at: Instant, -} - -#[derive(Clone, Debug)] -pub struct StreamResponseOpen { - pub request_open_at: Instant, - pub response_open_at: Instant, -} - -#[derive(Clone, Debug)] -pub struct StreamResponseFail { - pub request_open_at: Instant, - pub response_open_at: Instant, - pub response_first_frame_at: Option, - pub response_fail_at: Instant, - pub error: h2::Reason, - pub bytes_sent: u64, -} - -#[derive(Clone, Debug)] -pub struct StreamResponseEnd { - pub request_open_at: Instant, - pub response_open_at: Instant, - pub response_first_frame_at: Instant, - pub response_end_at: Instant, - pub grpc_status: Option, - pub bytes_sent: u64, -} diff --git a/src/tap/grpc/match_.rs b/src/tap/grpc/match_.rs new file mode 100644 index 0000000000000..0b123ff13eae6 --- /dev/null +++ b/src/tap/grpc/match_.rs @@ -0,0 +1,471 @@ +use http; +use indexmap::IndexMap; +use ipnet::{Contains, Ipv4Net, Ipv6Net}; +use std::boxed::Box; +use std::net; +use std::{error, fmt}; + +use api::net::ip_address; +use api::tap::observe_request; +use convert::TryFrom; + +use tap::Inspect; + +#[derive(Clone, Debug)] +pub enum Match { + Any(Vec), + All(Vec), + Not(Box), + Source(TcpMatch), + Destination(TcpMatch), + DestinationLabel(LabelMatch), + Http(HttpMatch), +} + +#[derive(Debug, Eq, PartialEq)] +pub enum InvalidMatch { + Empty, + InvalidPort, + InvalidNetwork, + InvalidHttpMethod, + InvalidScheme, +} + +#[derive(Clone, Debug)] +pub struct LabelMatch { + key: String, + value: String, +} + +#[derive(Clone, Debug)] +pub enum TcpMatch { + // Inclusive + PortRange(u16, u16), + Net(NetMatch), +} + +#[derive(Clone, Debug)] +pub enum NetMatch { + Net4(Ipv4Net), + Net6(Ipv6Net), +} + +#[derive(Clone, Debug)] +pub enum HttpMatch { + Scheme(http::uri::Scheme), + Method(http::Method), + Path(observe_request::match_::http::string_match::Match), + Authority(observe_request::match_::http::string_match::Match), +} + +// ===== impl Match ====== + +impl Match { + fn from_seq(seq: observe_request::match_::Seq) -> Result, InvalidMatch> { + let mut new = Vec::with_capacity(seq.matches.len()); + for m in seq.matches.into_iter().filter_map(|m| m.match_) { + new.push(Self::try_from(m)?); + } + + Ok(new) + } + + pub fn matches(&self, req: &http::Request, inspect: &I) -> bool { + match self { + Match::Any(ref ms) => ms.iter().any(|m| m.matches(req, inspect)), + Match::All(ref ms) => ms.iter().all(|m| m.matches(req, inspect)), + Match::Not(ref not) => !not.matches(req, inspect), + Match::Source(ref src) => inspect + .src_addr(req) + .map(|s| src.matches(s)) + .unwrap_or(false), + Match::Destination(ref dst) => inspect + .dst_addr(req) + .map(|d| dst.matches(d)) + .unwrap_or(false), + Match::DestinationLabel(ref lbl) => inspect + .dst_labels(req) + .map(|l| lbl.matches(l)) + .unwrap_or(false), + Match::Http(ref http) => http.matches(req, inspect), + } + } +} + +impl Match { + pub fn try_new(m: Option) -> Result { + m.and_then(|m| m.match_) + .map(Self::try_from) + .unwrap_or_else(|| Err(InvalidMatch::Empty)) + } +} + +impl TryFrom for Match { + type Err = InvalidMatch; + + #[allow(unconditional_recursion)] + fn try_from(m: observe_request::match_::Match) -> Result { + use api::tap::observe_request::match_; + + match m { + match_::Match::All(seq) => Self::from_seq(seq).map(Match::All), + match_::Match::Any(seq) => Self::from_seq(seq).map(Match::Any), + match_::Match::Not(m) => m + .match_ + .ok_or(InvalidMatch::Empty) + .and_then(Self::try_from) + .map(|m| Match::Not(Box::new(m))), + match_::Match::Source(src) => TcpMatch::try_from(src).map(Match::Source), + match_::Match::Destination(dst) => TcpMatch::try_from(dst).map(Match::Destination), + match_::Match::DestinationLabel(l) => { + LabelMatch::try_from(l).map(Match::DestinationLabel) + } + match_::Match::Http(http) => HttpMatch::try_from(http).map(Match::Http), + } + } +} + +// ===== impl LabelMatch ====== + +impl LabelMatch { + fn matches(&self, labels: &IndexMap) -> bool { + labels.get(&self.key) == Some(&self.value) + } +} + +impl TryFrom for LabelMatch { + type Err = InvalidMatch; + + fn try_from(m: observe_request::match_::Label) -> Result { + if m.key.is_empty() || m.value.is_empty() { + return Err(InvalidMatch::Empty); + } + + Ok(LabelMatch { + key: m.key.clone(), + value: m.value.clone(), + }) + } +} + +// ===== impl TcpMatch ====== + +impl TcpMatch { + fn matches(&self, addr: net::SocketAddr) -> bool { + match self { + // If either a minimum or maximum is not specified, the range is considered to + // be over a discrete value. + TcpMatch::PortRange(min, max) => *min <= addr.port() && addr.port() <= *max, + TcpMatch::Net(net) => net.matches(&addr.ip()), + } + } +} + +impl TryFrom for TcpMatch { + type Err = InvalidMatch; + + fn try_from(m: observe_request::match_::Tcp) -> Result { + use api::tap::observe_request::match_::tcp; + + m.match_.ok_or(InvalidMatch::Empty).and_then(|t| match t { + tcp::Match::Ports(range) => { + // If either a minimum or maximum is not specified, the range is considered to + // be over a discrete value. + let min = if range.min == 0 { range.max } else { range.min }; + let max = if range.max == 0 { range.min } else { range.max }; + if min == 0 || max == 0 { + return Err(InvalidMatch::Empty); + } + if min > u32::from(::std::u16::MAX) || max > u32::from(::std::u16::MAX) { + return Err(InvalidMatch::InvalidPort); + } + Ok(TcpMatch::PortRange(min as u16, max as u16)) + } + + tcp::Match::Netmask(netmask) => NetMatch::try_from(netmask).map(TcpMatch::Net), + }) + } +} + +// ===== impl NetMatch ====== + +impl NetMatch { + fn matches(&self, addr: &net::IpAddr) -> bool { + match self { + NetMatch::Net4(net) => match addr { + net::IpAddr::V6(_) => false, + net::IpAddr::V4(addr) => net.contains(addr), + }, + NetMatch::Net6(net) => match addr { + net::IpAddr::V4(_) => false, + net::IpAddr::V6(addr) => net.contains(addr), + }, + } + } +} + +impl TryFrom for NetMatch { + type Err = InvalidMatch; + + fn try_from(m: observe_request::match_::tcp::Netmask) -> Result { + let mask = if m.mask == 0 { + return Err(InvalidMatch::Empty); + } else if m.mask > u32::from(::std::u8::MAX) { + return Err(InvalidMatch::InvalidNetwork); + } else { + m.mask as u8 + }; + + let net = match m.ip.and_then(|a| a.ip).ok_or(InvalidMatch::Empty)? { + ip_address::Ip::Ipv4(n) => { + let ip = n.into(); + let net = Ipv4Net::new(ip, mask).map_err(|_| InvalidMatch::InvalidNetwork)?; + NetMatch::Net4(net) + } + ip_address::Ip::Ipv6(n) => { + let ip = (&n).into(); + let net = Ipv6Net::new(ip, mask).map_err(|_| InvalidMatch::InvalidNetwork)?; + NetMatch::Net6(net) + } + }; + + Ok(net) + } +} + +// ===== impl HttpMatch ====== + +impl HttpMatch { + fn matches(&self, req: &http::Request, inspect: &I) -> bool { + match self { + HttpMatch::Scheme(ref m) => { + m == req.uri().scheme_part().unwrap_or(&http::uri::Scheme::HTTP) + } + + HttpMatch::Method(ref m) => m == req.method(), + + HttpMatch::Authority(ref m) => inspect + .authority(req) + .map(|a| Self::matches_string(m, &a)) + .unwrap_or(false), + + HttpMatch::Path(ref m) => Self::matches_string(m, req.uri().path()), + } + } + + fn matches_string( + string_match: &observe_request::match_::http::string_match::Match, + value: &str, + ) -> bool { + use api::tap::observe_request::match_::http::string_match::Match::*; + + match string_match { + Exact(ref exact) => value == exact, + Prefix(ref prefix) => value.starts_with(prefix), + } + } +} + +impl TryFrom for HttpMatch { + type Err = InvalidMatch; + fn try_from(m: observe_request::match_::Http) -> Result { + use api::http_types::scheme::{Registered, Type}; + use api::tap::observe_request::match_::http::Match as Pb; + + m.match_.ok_or(InvalidMatch::Empty).and_then(|m| match m { + Pb::Scheme(s) => s.type_.ok_or(InvalidMatch::Empty).and_then(|s| match s { + Type::Registered(reg) if reg == Registered::Http.into() => { + Ok(HttpMatch::Scheme(http::uri::Scheme::HTTP)) + } + Type::Registered(reg) if reg == Registered::Https.into() => { + Ok(HttpMatch::Scheme(http::uri::Scheme::HTTPS)) + } + Type::Registered(_) => Err(InvalidMatch::InvalidScheme), + Type::Unregistered(ref s) => http::uri::Scheme::from_shared(s.as_str().into()) + .map(HttpMatch::Scheme) + .map_err(|_| InvalidMatch::InvalidScheme), + }), + + Pb::Method(m) => m + .type_ + .ok_or(InvalidMatch::Empty) + .and_then(|m| m.try_as_http().map_err(|_| InvalidMatch::InvalidHttpMethod)) + .map(HttpMatch::Method), + + Pb::Authority(a) => a + .match_ + .ok_or(InvalidMatch::Empty) + .map(|a| HttpMatch::Authority(a)), + + Pb::Path(p) => p + .match_ + .ok_or(InvalidMatch::Empty) + .map(|p| HttpMatch::Path(p)), + }) + } +} + +#[cfg(test)] +mod tests { + use ipnet::{Contains, Ipv4Net, Ipv6Net}; + use quickcheck::*; + use std::collections::HashMap; + + use super::*; + use api::http_types; + + impl Arbitrary for LabelMatch { + fn arbitrary(g: &mut G) -> Self { + Self { + key: Arbitrary::arbitrary(g), + value: Arbitrary::arbitrary(g), + } + } + } + + impl Arbitrary for TcpMatch { + fn arbitrary(g: &mut G) -> Self { + if g.gen::() { + TcpMatch::Net(NetMatch::arbitrary(g)) + } else { + TcpMatch::PortRange(g.gen(), g.gen()) + } + } + } + + impl Arbitrary for NetMatch { + fn arbitrary(g: &mut G) -> Self { + if g.gen::() { + let addr = net::Ipv4Addr::arbitrary(g); + let bits = g.gen::() % 32; + let net = Ipv4Net::new(addr, bits).expect("ipv4 network address"); + NetMatch::Net4(net) + } else { + let addr = net::Ipv6Addr::arbitrary(g); + let bits = g.gen::() % 128; + let net = Ipv6Net::new(addr, bits).expect("ipv6 network address"); + NetMatch::Net6(net) + } + } + } + + quickcheck! { + fn tcp_from_proto(tcp: observe_request::match_::Tcp) -> bool { + use self::observe_request::match_::tcp; + + let err: Option = + tcp.match_.as_ref() + .map(|m| match m { + tcp::Match::Ports(ps) => { + let ok = 0 < ps.min && + ps.min <= ps.max && + ps.max < u32::from(::std::u16::MAX); + if ok { None } else { Some(InvalidMatch::InvalidPort) } + } + tcp::Match::Netmask(n) => { + match n.ip.as_ref().and_then(|ip| ip.ip.as_ref()) { + Some(_) => None, + None => Some(InvalidMatch::Empty), + } + } + }) + .unwrap_or(Some(InvalidMatch::Empty)); + + err == TcpMatch::try_from(tcp).err() + } + + fn tcp_matches(m: TcpMatch, addr: net::SocketAddr) -> bool { + let matches = match (&m, addr.ip()) { + (&TcpMatch::Net(NetMatch::Net4(ref n)), net::IpAddr::V4(ip)) => { + n.contains(&ip) + } + (&TcpMatch::Net(NetMatch::Net6(ref n)), net::IpAddr::V6(ip)) => { + n.contains(&ip) + } + (&TcpMatch::PortRange(min, max), _) => { + min <= addr.port() && addr.port() <= max + } + _ => false + }; + + m.matches(addr) == matches + } + + fn labels_from_proto(label: observe_request::match_::Label) -> bool { + let err: Option = + if label.key.is_empty() || label.value.is_empty() { + Some(InvalidMatch::Empty) + } else { + None + }; + + err == LabelMatch::try_from(label).err() + } + + fn label_matches(l: LabelMatch, labels: HashMap) -> bool { + use std::iter::FromIterator; + + let matches = labels.get(&l.key) == Some(&l.value); + l.matches(&IndexMap::from_iter(labels.into_iter())) == matches + } + + fn http_from_proto(http: observe_request::match_::Http) -> bool { + use self::observe_request::match_::http; + + let err = match http.match_.as_ref() { + None => Some(InvalidMatch::Empty), + Some(http::Match::Method(ref m)) => { + match m.type_.as_ref() { + None => Some(InvalidMatch::Empty), + Some(http_types::http_method::Type::Unregistered(ref m)) if m.len() > 15 => { + Some(InvalidMatch::InvalidHttpMethod) + } + Some(http_types::http_method::Type::Unregistered(m)) => { + ::http::Method::from_bytes(m.as_bytes()) + .err() + .map(|_| InvalidMatch::InvalidHttpMethod) + } + Some(http_types::http_method::Type::Registered(m)) if *m >= 9 => { + Some(InvalidMatch::InvalidHttpMethod) + } + Some(http_types::http_method::Type::Registered(_)) => None, + } + } + Some(http::Match::Scheme(m)) => match m.type_.as_ref() { + None => Some(InvalidMatch::Empty), + Some(http_types::scheme::Type::Unregistered(_)) => None, + Some(http_types::scheme::Type::Registered(m)) if *m < 2 => None, + Some(http_types::scheme::Type::Registered(_)) => Some(InvalidMatch::InvalidScheme), + } + Some(http::Match::Authority(m)) => match m.match_.as_ref() { + None => Some(InvalidMatch::Empty), + Some(_) => None, + } + Some(http::Match::Path(m)) => match m.match_.as_ref() { + None => Some(InvalidMatch::Empty), + Some(_) => None, + } + }; + + err == HttpMatch::try_from(http).err() + } + } +} + +impl fmt::Display for InvalidMatch { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "{}", + match self { + InvalidMatch::Empty => "missing required field", + InvalidMatch::InvalidPort => "invalid port number", + InvalidMatch::InvalidNetwork => "invalid network address", + InvalidMatch::InvalidHttpMethod => "invalid http method", + InvalidMatch::InvalidScheme => "invalid request scheme", + } + ) + } +} + +impl error::Error for InvalidMatch {} diff --git a/src/tap/grpc/mod.rs b/src/tap/grpc/mod.rs new file mode 100644 index 0000000000000..e179f0ba7efb7 --- /dev/null +++ b/src/tap/grpc/mod.rs @@ -0,0 +1,4 @@ +mod match_; +mod server; + +pub use self::server::{Server, Tap}; diff --git a/src/tap/grpc/server.rs b/src/tap/grpc/server.rs new file mode 100644 index 0000000000000..408b0045e4e38 --- /dev/null +++ b/src/tap/grpc/server.rs @@ -0,0 +1,549 @@ +use bytes::Buf; +use futures::sync::{mpsc, oneshot}; +use futures::{future, Async, Future, Poll, Stream}; +use http::HeaderMap; +use hyper::body::Payload; +use never::Never; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Weak}; +use std::time::Instant; +use tokio_timer::clock; +use tower_grpc::{self as grpc, Response}; + +use api::{http_types, pb_duration, tap as api}; + +use super::match_::Match; +use proxy::http::HasH2Reason; +use tap::{iface, Inspect}; + +#[derive(Clone, Debug)] +pub struct Server { + subscribe: T, + base_id: Arc, +} + +#[derive(Debug)] +pub struct ResponseFuture { + subscribe: F, + dispatch: Option, + events_rx: Option>, +} + +#[derive(Debug)] +pub struct ResponseStream { + dispatch: Option, + events_rx: mpsc::Receiver, +} + +#[derive(Debug)] +struct Dispatch { + base_id: u32, + count: usize, + limit: usize, + taps_rx: mpsc::Receiver>, + events_tx: mpsc::Sender, + match_handle: Arc, +} + +#[derive(Clone, Debug)] +struct TapTx { + id: api::tap_event::http::StreamId, + tx: mpsc::Sender, +} + +#[derive(Clone, Debug)] +pub struct Tap { + match_: Weak, + taps_tx: mpsc::Sender>, +} + +#[derive(Debug)] +pub struct TapFuture(FutState); + +#[derive(Debug)] +enum FutState { + Init { + request_init_at: Instant, + taps_tx: mpsc::Sender>, + }, + Pending { + request_init_at: Instant, + rx: oneshot::Receiver, + }, +} + +#[derive(Debug)] +pub struct TapRequest { + request_init_at: Instant, + tap: TapTx, +} + +#[derive(Debug)] +pub struct TapResponse { + base_event: api::TapEvent, + request_init_at: Instant, + tap: TapTx, +} + +#[derive(Debug)] +pub struct TapRequestPayload { + base_event: api::TapEvent, + tap: TapTx, +} + +#[derive(Debug)] +pub struct TapResponsePayload { + base_event: api::TapEvent, + request_init_at: Instant, + response_init_at: Instant, + response_bytes: usize, + tap: TapTx, +} + +// === impl Server === + +impl> Server { + pub(in tap) fn new(subscribe: T) -> Self { + let base_id = Arc::new(0.into()); + Self { base_id, subscribe } + } + + fn invalid_arg(event: http::header::HeaderValue) -> grpc::Error { + let status = grpc::Status::with_code(grpc::Code::InvalidArgument); + let mut headers = HeaderMap::new(); + headers.insert("grpc-message", event); + grpc::Error::Grpc(status, headers) + } +} + +impl api::server::Tap for Server +where + T: iface::Subscribe + Clone, +{ + type ObserveStream = ResponseStream; + type ObserveFuture = future::Either< + future::FutureResult, grpc::Error>, + ResponseFuture, + >; + + fn observe(&mut self, req: grpc::Request) -> Self::ObserveFuture { + let req = req.into_inner(); + + let limit = req.limit as usize; + if limit == 0 { + let v = http::header::HeaderValue::from_static("limit must be positive"); + return future::Either::A(future::err(Self::invalid_arg(v))); + }; + trace!("tap: limit={}", limit); + + // Read the match logic into a type we can use to evaluate against + // requests. This match will be shared (weakly) by all registered + // services to match requests. The response stream strongly holds the + // match until the response is complete. This way, services never + // evaluate matches for taps that have been completed or canceled. + let match_handle = match Match::try_new(req.match_) { + Ok(m) => Arc::new(m), + Err(e) => { + warn!("invalid tap request: {} ", e); + let v = format!("{}", e) + .parse() + .unwrap_or_else(|_| http::header::HeaderValue::from_static("invalid message")); + return future::Either::A(future::err(Self::invalid_arg(v))); + } + }; + + // Wrapping is okay. This is realy just to disambiguate events within a + // single tap session (i.e. that may consist of several tap requests). + let base_id = self.base_id.fetch_add(1, Ordering::AcqRel) as u32; + debug!("tap; id={}; match={:?}", base_id, match_handle); + + // The taps channel is used by services to acquire a `TapTx` for + // `Dispatch`, i.e. ensuring that no more than the requested number of + // taps are executed. + // + // The read side of this channel (held by `dispatch`) is dropped by the + // `ResponseStream` once the `limit` has been reached. This is dropped + // with the strong reference to `match_` so that services can determine + // when a Tap should be dropped. + // + // The response stream continues to process events for open streams + // until all streams have been completed. + let (taps_tx, taps_rx) = mpsc::channel(super::super::TAP_CAPACITY); + + let tap = Tap::new(Arc::downgrade(&match_handle), taps_tx); + let subscribe = self.subscribe.subscribe(tap); + + // The events channel is used to emit tap events to the response stream. + // + // At most `limit` copies of `events_tx` are dispatched to `taps_rx` + // requests. Each tapped request's sender is dropped when the response + // completes, so the event stream closes gracefully when all tapped + // requests are completed without additional coordination. + let (events_tx, events_rx) = + mpsc::channel(super::super::PER_RESPONSE_EVENT_BUFFER_CAPACITY); + + // Reads up to `limit` requests from from `taps_rx` and satisfies them + // with a cpoy of `events_tx`. + let dispatch = Dispatch { + base_id, + count: 0, + limit, + taps_rx, + events_tx, + match_handle, + }; + + future::Either::B(ResponseFuture { + subscribe, + dispatch: Some(dispatch), + events_rx: Some(events_rx), + }) + } +} + +impl> Future for ResponseFuture { + type Item = Response; + type Error = grpc::Error; + + fn poll(&mut self) -> Poll { + // Ensure that tap registers successfully. + match self.subscribe.poll() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(())) => {} + Err(_) => { + let status = grpc::Status::with_code(grpc::Code::ResourceExhausted); + let mut headers = HeaderMap::new(); + headers.insert("grpc-message", "Too many active taps".parse().unwrap()); + return Err(grpc::Error::Grpc(status, headers)); + } + } + + let rsp = ResponseStream { + dispatch: self.dispatch.take(), + events_rx: self.events_rx.take().expect("events_rx must be set"), + }; + + Ok(Response::new(rsp).into()) + } +} + +// === impl ResponseStream === + +impl Stream for ResponseStream { + type Item = api::TapEvent; + type Error = grpc::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + // Drop the dispatch future once it completes so that services do not do + // any more matching against this tap. + // + // Furthermore, this drops the event sender so that `events_rx` closes + // gracefully when all open taps are complete. + self.dispatch = self.dispatch.take().and_then(|mut d| match d.poll() { + Ok(Async::NotReady) => Some(d), + Ok(Async::Ready(())) | Err(_) => None, + }); + + // Read events from taps. The receiver can't actually error, but we need + // to satisfy the type signature, so we coerce errors into EOS. + self.events_rx.poll().or_else(|_| Ok(None.into())) + } +} + +// === impl Dispatch === + +impl Future for Dispatch { + type Item = (); + type Error = (); + + /// Read tap requests, assign each an ID and send it back to the requesting + /// service with an event sender so that it may emittap events. + /// + /// Becomes ready when the limit has been reached. + fn poll(&mut self) -> Poll<(), Self::Error> { + while let Some(tx) = try_ready!(self.taps_rx.poll().map_err(|_| ())) { + debug_assert!(self.count < self.limit - 1); + + self.count += 1; + let tap = TapTx { + tx: self.events_tx.clone(), + id: api::tap_event::http::StreamId { + base: self.base_id, + stream: self.count as u64, + }, + }; + if tx.send(tap).is_err() { + // If the tap isn't sent, then restore the count. + self.count -= 1; + } + + if self.count == self.limit - 1 { + return Ok(Async::Ready(())); + } + } + + Ok(Async::Ready(())) + } +} + +// === impl Tap === + +impl Tap { + fn new(match_: Weak, taps_tx: mpsc::Sender>) -> Self { + Self { match_, taps_tx } + } +} + +impl iface::Tap for Tap { + type TapRequest = TapRequest; + type TapRequestPayload = TapRequestPayload; + type TapResponse = TapResponse; + type TapResponsePayload = TapResponsePayload; + type Future = TapFuture; + + fn can_tap_more(&self) -> bool { + self.match_.upgrade().is_some() + } + + fn should_tap(&self, req: &http::Request, inspect: &I) -> bool { + self.match_ + .upgrade() + .map(|m| m.matches(req, inspect)) + .unwrap_or(false) + } + + fn tap(&mut self) -> Self::Future { + TapFuture(FutState::Init { + request_init_at: clock::now(), + taps_tx: self.taps_tx.clone(), + }) + } +} + +// === impl TapFuture === + +impl Future for TapFuture { + type Item = Option; + type Error = Never; + + fn poll(&mut self) -> Poll { + loop { + self.0 = match self.0 { + FutState::Init { + ref request_init_at, + ref mut taps_tx, + } => { + match taps_tx.poll_ready() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(())) => {} + Err(_) => return Ok(Async::Ready(None)), + } + let (tx, rx) = oneshot::channel(); + + // If this fails, polling `rx` will fail below. + let _ = taps_tx.try_send(tx); + + FutState::Pending { + request_init_at: *request_init_at, + rx, + } + } + FutState::Pending { + ref request_init_at, + ref mut rx, + } => { + return match rx.poll() { + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(tap)) => { + let t = TapRequest { + request_init_at: *request_init_at, + tap, + }; + Ok(Some(t).into()) + } + Err(_) => Ok(None.into()), + }; + } + } + } + } +} + +// === impl TapRequest === + +impl iface::TapRequest for TapRequest { + type TapPayload = TapRequestPayload; + type TapResponse = TapResponse; + type TapResponsePayload = TapResponsePayload; + + fn open( + mut self, + req: &http::Request, + inspect: &I, + ) -> (TapRequestPayload, TapResponse) { + let base_event = base_event(req, inspect); + + let init = api::tap_event::http::RequestInit { + id: Some(self.tap.id.clone()), + method: Some(req.method().into()), + scheme: req.uri().scheme_part().map(http_types::Scheme::from), + authority: inspect.authority(req).unwrap_or_default(), + path: req.uri().path().into(), + }; +; + let event = api::TapEvent { + event: Some(api::tap_event::Event::Http(api::tap_event::Http { + event: Some(api::tap_event::http::Event::RequestInit(init)), + })), + ..base_event.clone() + }; + let _ = self.tap.tx.try_send(event); + + let req = TapRequestPayload { + tap: self.tap.clone(), + base_event: base_event.clone(), + }; + let rsp = TapResponse { + tap: self.tap, + base_event, + request_init_at: self.request_init_at, + }; + (req, rsp) + } +} + +// === impl TapResponse === + +impl iface::TapResponse for TapResponse { + type TapPayload = TapResponsePayload; + + fn tap(mut self, rsp: &http::Response) -> TapResponsePayload { + let response_init_at = clock::now(); + let init = api::tap_event::http::Event::ResponseInit(api::tap_event::http::ResponseInit { + id: Some(self.tap.id.clone()), + since_request_init: Some(pb_duration(response_init_at - self.request_init_at)), + http_status: rsp.status().as_u16().into(), + }); + + let event = api::TapEvent { + event: Some(api::tap_event::Event::Http(api::tap_event::Http { + event: Some(init), + })), + ..self.base_event.clone() + }; + let _ = self.tap.tx.try_send(event); + + TapResponsePayload { + base_event: self.base_event, + request_init_at: self.request_init_at, + response_init_at, + response_bytes: 0, + tap: self.tap, + } + } + + fn fail(mut self, err: &E) { + let response_end_at = clock::now(); + let reason = err.h2_reason(); + let end = api::tap_event::http::Event::ResponseEnd(api::tap_event::http::ResponseEnd { + id: Some(self.tap.id.clone()), + since_request_init: Some(pb_duration(response_end_at - self.request_init_at)), + since_response_init: None, + response_bytes: 0, + eos: Some(api::Eos { + end: reason.map(|r| api::eos::End::ResetErrorCode(r.into())), + }), + }); + + let event = api::TapEvent { + event: Some(api::tap_event::Event::Http(api::tap_event::Http { + event: Some(end), + })), + ..self.base_event + }; + let _ = self.tap.tx.try_send(event); + } +} + +// === impl TapRequestPayload === + +impl iface::TapPayload for TapRequestPayload { + fn data(&mut self, _: &B) {} + + fn eos(self, _: Option<&http::HeaderMap>) {} + + fn fail(self, _: &E) {} +} + +// === impl TapResponsePayload === + +impl iface::TapPayload for TapResponsePayload { + fn data(&mut self, data: &B) { + self.response_bytes += data.remaining(); + } + + fn eos(self, trls: Option<&http::HeaderMap>) { + let end = trls + .and_then(|t| t.get("grpc-status")) + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()) + .map(api::eos::End::GrpcStatusCode); + + self.send(end); + } + + fn fail(self, e: &E) { + let end = e.h2_reason().map(|r| api::eos::End::ResetErrorCode(r.into())); + self.send(end); + } +} + +impl TapResponsePayload { + fn send(mut self, end: Option) { + let response_end_at = clock::now(); + let end = api::tap_event::http::ResponseEnd { + id: Some(self.tap.id), + since_request_init: Some(pb_duration(response_end_at - self.request_init_at)), + since_response_init: Some(pb_duration(response_end_at - self.response_init_at)), + response_bytes: self.response_bytes as u64, + eos: Some(api::Eos { end }), + }; + + let event = api::TapEvent { + event: Some(api::tap_event::Event::Http(api::tap_event::Http { + event: Some(api::tap_event::http::Event::ResponseEnd(end)), + })), + ..self.base_event + }; + let _ = self.tap.tx.try_send(event); + } +} + +// All of the events emitted from tap have a common set of metadata. +// Build this once, without an `event`, so that it can be used to build +// each HTTP event. +fn base_event(req: &http::Request, inspect: &I) -> api::TapEvent { + api::TapEvent { + proxy_direction: if inspect.is_outbound(req) { + api::tap_event::ProxyDirection::Outbound.into() + } else { + api::tap_event::ProxyDirection::Inbound.into() + }, + source: inspect.src_addr(req).as_ref().map(|a| a.into()), + source_meta: { + let mut m = api::tap_event::EndpointMeta::default(); + let tls = format!("{}", inspect.src_tls(req)); + m.labels.insert("tls".to_owned(), tls); + Some(m) + }, + destination: inspect.dst_addr(req).as_ref().map(|a| a.into()), + destination_meta: inspect.dst_labels(req).map(|labels| { + let mut m = api::tap_event::EndpointMeta::default(); + m.labels.extend(labels.clone()); + let tls = format!("{}", inspect.dst_tls(req)); + m.labels.insert("tls".to_owned(), tls); + m + }), + event: None, + } +} diff --git a/src/tap/match_.rs b/src/tap/match_.rs deleted file mode 100644 index c4da4149a4130..0000000000000 --- a/src/tap/match_.rs +++ /dev/null @@ -1,543 +0,0 @@ -use indexmap::IndexMap; -use std::boxed::Box; -use std::net; - -use http; -use ipnet::{Contains, Ipv4Net, Ipv6Net}; - -use super::{event, Event}; -use api::net::ip_address; -use api::tap::observe_request; -use convert::TryFrom; - -#[derive(Clone, Debug)] -pub(super) enum Match { - Any(Vec), - All(Vec), - Not(Box), - Source(TcpMatch), - Destination(TcpMatch), - DestinationLabel(LabelMatch), - Http(HttpMatch), -} - -#[derive(Eq, PartialEq)] -pub enum InvalidMatch { - Empty, - InvalidPort, - InvalidNetwork, - InvalidHttpMethod, - InvalidScheme, - Unimplemented, -} - -#[derive(Clone, Debug)] -pub(super) struct LabelMatch { - key: String, - value: String, -} - -#[derive(Clone, Debug)] -pub(super) enum TcpMatch { - // Inclusive - PortRange(u16, u16), - Net(NetMatch), -} - -#[derive(Clone, Debug)] -pub(super) enum NetMatch { - Net4(Ipv4Net), - Net6(Ipv6Net), -} - -#[derive(Clone, Debug)] -pub(super) enum HttpMatch { - Scheme(String), - Method(http::Method), - Path(observe_request::match_::http::string_match::Match), - Authority(observe_request::match_::http::string_match::Match), -} - -// ===== impl Match ====== - -impl Match { - pub(super) fn matches(&self, ev: &Event) -> bool { - match *self { - Match::Any(ref any) => { - for m in any { - if m.matches(ev) { - return true; - } - } - false - } - - Match::All(ref all) => { - for m in all { - if !m.matches(ev) { - return false; - } - } - true - } - - Match::Not(ref not) => !not.matches(ev), - - Match::Source(ref src) => match *ev { - Event::StreamRequestOpen(ref req) | Event::StreamRequestFail(ref req, _) => { - src.matches(&req.source.remote) - } - Event::StreamResponseOpen(ref rsp, _) | - Event::StreamResponseFail(ref rsp, _) | - Event::StreamResponseEnd(ref rsp, _) => src.matches(&rsp.request.source.remote), - _ => false, - }, - - Match::Destination(ref dst) => match *ev { - Event::StreamRequestOpen(ref req) | Event::StreamRequestFail(ref req, _) => { - dst.matches(&req.endpoint.target.addr) - } - Event::StreamResponseOpen(ref rsp, _) | - Event::StreamResponseFail(ref rsp, _) | - Event::StreamResponseEnd(ref rsp, _) => - dst.matches(&rsp.request.endpoint.target.addr), - _ => false, - }, - - Match::DestinationLabel(ref label) => match *ev { - Event::StreamRequestOpen(ref req) | Event::StreamRequestFail(ref req, _) => { - label.matches(&req.endpoint.labels) - } - - Event::StreamResponseOpen(ref rsp, _) | - Event::StreamResponseFail(ref rsp, _) | - Event::StreamResponseEnd(ref rsp, _) => { - label.matches(&rsp.request.endpoint.labels) - } - - _ => false, - } - - Match::Http(ref http) => match *ev { - Event::StreamRequestOpen(ref req) | Event::StreamRequestFail(ref req, _) => { - http.matches(req) - } - - Event::StreamResponseOpen(ref rsp, _) | - Event::StreamResponseFail(ref rsp, _) | - Event::StreamResponseEnd(ref rsp, _) => http.matches(&rsp.request), - - _ => false, - }, - } - } - - pub(super) fn new(match_: &observe_request::Match) -> Result { - match_ - .match_ - .as_ref() - .map(Match::try_from) - .unwrap_or_else(|| Err(InvalidMatch::Empty)) - } - - fn from_seq(seq: &observe_request::match_::Seq) -> Result, InvalidMatch> { - let mut new = Vec::with_capacity(seq.matches.len()); - - for m in &seq.matches { - if let Some(m) = m.match_.as_ref() { - new.push(Self::try_from(m)?); - } - } - - Ok(new) - } -} - -impl<'a> TryFrom<&'a observe_request::match_::Match> for Match { - type Err = InvalidMatch; - - #[allow(unconditional_recursion)] - fn try_from(m: &observe_request::match_::Match) -> Result { - use api::tap::observe_request::match_; - - let match_ = match *m { - match_::Match::All(ref seq) => Match::All(Self::from_seq(seq)?), - - match_::Match::Any(ref seq) => Match::Any(Self::from_seq(seq)?), - - match_::Match::Not(ref m) => match m.match_.as_ref() { - Some(m) => Match::Not(Box::new(Self::try_from(m)?)), - None => return Err(InvalidMatch::Empty), - }, - - match_::Match::Source(ref src) => Match::Source(TcpMatch::try_from(src)?), - - match_::Match::Destination(ref dst) => Match::Destination(TcpMatch::try_from(dst)?), - - match_::Match::DestinationLabel(ref label) => { - Match::DestinationLabel(LabelMatch::try_from(label)?) - } - - match_::Match::Http(ref http) => Match::Http(HttpMatch::try_from(http)?), - }; - - Ok(match_) - } -} - -// ===== impl LabelMatch ====== - -impl LabelMatch { - fn matches(&self, labels: &IndexMap) -> bool { - labels.get(&self.key) == Some(&self.value) - } -} - -impl<'a> TryFrom<&'a observe_request::match_::Label> for LabelMatch { - type Err = InvalidMatch; - - fn try_from(m: &observe_request::match_::Label) -> Result { - if m.key.is_empty() || m.value.is_empty() { - return Err(InvalidMatch::Empty); - } - - Ok(LabelMatch { - key: m.key.clone(), - value: m.value.clone(), - }) - } -} - -// ===== impl TcpMatch ====== - -impl TcpMatch { - fn matches(&self, addr: &net::SocketAddr) -> bool { - match *self { - // If either a minimum or maximum is not specified, the range is considered to - // be over a discrete value. - TcpMatch::PortRange(min, max) => min <= addr.port() && addr.port() <= max, - - TcpMatch::Net(ref net) => net.matches(&addr.ip()), - } - } -} - -impl<'a> TryFrom<&'a observe_request::match_::Tcp> for TcpMatch { - type Err = InvalidMatch; - - fn try_from(m: &observe_request::match_::Tcp) -> Result { - use api::tap::observe_request::match_::tcp; - - let m = match m.match_.as_ref() { - None => return Err(InvalidMatch::Empty), - Some(m) => m, - }; - - let match_ = match *m { - tcp::Match::Ports(ref range) => { - // If either a minimum or maximum is not specified, the range is considered to - // be over a discrete value. - let min = if range.min == 0 { range.max } else { range.min }; - let max = if range.max == 0 { range.min } else { range.max }; - if min == 0 || max == 0 { - return Err(InvalidMatch::Empty); - } - if min > u32::from(::std::u16::MAX) || max > u32::from(::std::u16::MAX) { - return Err(InvalidMatch::InvalidPort); - } - TcpMatch::PortRange(min as u16, max as u16) - } - - tcp::Match::Netmask(ref netmask) => TcpMatch::Net(NetMatch::try_from(netmask)?), - }; - - Ok(match_) - } -} - -// ===== impl NetMatch ====== - -impl NetMatch { - fn matches(&self, addr: &net::IpAddr) -> bool { - match *self { - NetMatch::Net4(ref net) => match *addr { - net::IpAddr::V6(_) => false, - net::IpAddr::V4(ref addr) => net.contains(addr), - }, - NetMatch::Net6(ref net) => match *addr { - net::IpAddr::V4(_) => false, - net::IpAddr::V6(ref addr) => net.contains(addr), - }, - } - } -} - -impl<'a> TryFrom<&'a observe_request::match_::tcp::Netmask> for NetMatch { - type Err = InvalidMatch; - fn try_from(m: &'a observe_request::match_::tcp::Netmask) -> Result { - let mask = if m.mask == 0 { - return Err(InvalidMatch::Empty); - } else if m.mask > u32::from(::std::u8::MAX) { - return Err(InvalidMatch::InvalidNetwork); - } else { - m.mask as u8 - }; - - let ip = match m.ip.as_ref().and_then(|a| a.ip.as_ref()) { - Some(ip) => ip, - None => return Err(InvalidMatch::Empty), - }; - - let net = match *ip { - ip_address::Ip::Ipv4(ref n) => { - let net = - Ipv4Net::new((*n).into(), mask).map_err(|_| InvalidMatch::InvalidNetwork)?; - NetMatch::Net4(net) - } - ip_address::Ip::Ipv6(ref ip6) => { - let net = Ipv6Net::new(ip6.into(), mask).map_err(|_| InvalidMatch::InvalidNetwork)?; - NetMatch::Net6(net) - } - }; - - Ok(net) - } -} - -// ===== impl HttpMatch ====== - -impl HttpMatch { - fn matches(&self, req: &event::Request) -> bool { - match *self { - HttpMatch::Scheme(ref m) => req.scheme.as_ref() - .map(|s| m == s.as_ref()) - .unwrap_or(false), - - HttpMatch::Method(ref m) => *m == req.method, - - HttpMatch::Authority(ref m) => req.authority.as_ref() - .map(|a| Self::matches_string(m, a.as_str())) - .unwrap_or(false), - - HttpMatch::Path(ref m) => Self::matches_string(m, &req.path), - } - } - - fn matches_string( - string_match: &observe_request::match_::http::string_match::Match, - value: &str, - ) -> bool { - use api::tap::observe_request::match_::http::string_match::Match::*; - - match *string_match { - Exact(ref exact) => value == exact, - Prefix(ref prefix) => value.starts_with(prefix), - } - } -} - -impl<'a> TryFrom<&'a observe_request::match_::Http> for HttpMatch { - type Err = InvalidMatch; - fn try_from(m: &'a observe_request::match_::Http) -> Result { - use api::tap::observe_request::match_::http::Match as Pb; - - m.match_ - .as_ref() - .ok_or_else(|| InvalidMatch::Empty) - .and_then(|m| match *m { - Pb::Scheme(ref s) => s.type_ - .as_ref() - .ok_or_else(|| InvalidMatch::Empty) - .and_then(|s| { - s.try_to_string() - .map(HttpMatch::Scheme) - .map_err(|_| InvalidMatch::InvalidScheme) - }), - - Pb::Method(ref m) => m.type_ - .as_ref() - .ok_or_else(|| InvalidMatch::Empty) - .and_then(|m| { - m.try_as_http() - .map(HttpMatch::Method) - .map_err(|_| InvalidMatch::InvalidHttpMethod) - }), - - Pb::Authority(ref a) => a.match_ - .as_ref() - .ok_or_else(|| InvalidMatch::Empty) - .map(|a| HttpMatch::Authority(a.clone())), - - Pb::Path(ref p) => p.match_ - .as_ref() - .ok_or_else(|| InvalidMatch::Empty) - .map(|p| HttpMatch::Path(p.clone())), - }) - } -} - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - use ipnet::{Contains, Ipv4Net, Ipv6Net}; - use quickcheck::*; - - use super::*; - use api::http_types; - - impl Arbitrary for LabelMatch { - fn arbitrary(g: &mut G) -> Self { - Self { - key: Arbitrary::arbitrary(g), - value: Arbitrary::arbitrary(g), - } - } - } - - impl Arbitrary for TcpMatch { - fn arbitrary(g: &mut G) -> Self { - if g.gen::() { - TcpMatch::Net(NetMatch::arbitrary(g)) - } else { - TcpMatch::PortRange(g.gen(), g.gen()) - } - } - } - - impl Arbitrary for NetMatch { - fn arbitrary(g: &mut G) -> Self { - if g.gen::() { - let addr = net::Ipv4Addr::arbitrary(g); - let bits = g.gen::() % 32; - let net = Ipv4Net::new(addr, bits).expect("ipv4 network address"); - NetMatch::Net4(net) - } else { - let addr = net::Ipv6Addr::arbitrary(g); - let bits = g.gen::() % 128; - let net = Ipv6Net::new(addr, bits).expect("ipv6 network address"); - NetMatch::Net6(net) - } - } - } - - quickcheck! { - fn tcp_from_proto(tcp: observe_request::match_::Tcp) -> bool { - use self::observe_request::match_::tcp; - - let err: Option = - tcp.match_.as_ref() - .map(|m| match *m { - tcp::Match::Ports(ref ps) => { - let ok = 0 < ps.min && - ps.min <= ps.max && - ps.max < u32::from(::std::u16::MAX); - if ok { None } else { Some(InvalidMatch::InvalidPort) } - } - tcp::Match::Netmask(ref n) => { - let ip = n.ip.as_ref().and_then(|a| a.ip.as_ref()); - if ip.is_some() { None } else { Some(InvalidMatch::Empty) } - } - }) - .unwrap_or(Some(InvalidMatch::Empty)); - - err == TcpMatch::try_from(&tcp).err() - } - - fn tcp_matches(m: TcpMatch, addr: net::SocketAddr) -> bool { - let matches = match (&m, addr.ip()) { - (&TcpMatch::Net(NetMatch::Net4(ref n)), net::IpAddr::V4(ip)) => { - n.contains(&ip) - } - (&TcpMatch::Net(NetMatch::Net6(ref n)), net::IpAddr::V6(ip)) => { - n.contains(&ip) - } - (&TcpMatch::PortRange(min, max), _) => { - min <= addr.port() && addr.port() <= max - } - _ => false - }; - - m.matches(&addr) == matches - } - - fn labels_from_proto(label: observe_request::match_::Label) -> bool { - let err: Option = - if label.key.is_empty() || label.value.is_empty() { - Some(InvalidMatch::Empty) - } else { - None - }; - - err == LabelMatch::try_from(&label).err() - } - - fn label_matches(l: LabelMatch, labels: HashMap) -> bool { - let matches = labels.get(&l.key) == Some(&l.value); - let mut ilabels = IndexMap::with_capacity(labels.len()); - for (k, v) in &labels { - ilabels.insert(k.clone(), v.clone()); - } - l.matches(&ilabels) == matches - } - - fn http_from_proto(http: observe_request::match_::Http) -> bool { - use self::observe_request::match_::http; - - let err = match http.match_.as_ref() { - None => Some(InvalidMatch::Empty), - Some(&http::Match::Method(ref m)) => { - match m.type_.as_ref() { - None => Some(InvalidMatch::Empty), - Some(&http_types::http_method::Type::Unregistered(ref m)) => if m.len() <= 15 { - let mut err = None; - if let Err(_) = ::http::Method::from_bytes(m.as_bytes()) { - err = Some(InvalidMatch::InvalidHttpMethod); - } - err - } else { - Some(InvalidMatch::InvalidHttpMethod) - } - Some(&http_types::http_method::Type::Registered(m)) => if m < 9 { - None - } else { - Some(InvalidMatch::InvalidHttpMethod) - } - } - } - Some(&http::Match::Scheme(ref m)) => { - match m.type_.as_ref() { - None => Some(InvalidMatch::Empty), - Some(&http_types::scheme::Type::Unregistered(_)) => None, - Some(&http_types::scheme::Type::Registered(m)) => { - if m < 2 { - None - } else { - Some(InvalidMatch::InvalidScheme) - } - } - } - } - Some(&http::Match::Authority(ref m)) => { - match m.match_ { - None => Some(InvalidMatch::Empty), - Some(_) => None, - } - } - Some(&http::Match::Path(ref m)) => { - match m.match_ { - None => Some(InvalidMatch::Empty), - Some(_) => None, - } - } - }; - - err == HttpMatch::try_from(&http).err() - } - - // TODO - // fn http_matches(m: HttpMatch, ctx: event::Request) -> bool { - // let matches = false; - // m.matches(&addr) == matches - // } - } -} diff --git a/src/tap/mod.rs b/src/tap/mod.rs index 9da36f902f8ab..20d978dd40fd0 100644 --- a/src/tap/mod.rs +++ b/src/tap/mod.rs @@ -1,111 +1,176 @@ -use futures_mpsc_lossy; +use http; use indexmap::IndexMap; -use std::sync::{atomic::{AtomicUsize, Ordering}, Arc}; +use std::net; -use api::tap::observe_request; +use transport::tls; -pub mod event; -mod match_; +mod daemon; +mod grpc; mod service; -pub use self::event::{Direction, Endpoint, Event}; -pub use self::match_::InvalidMatch; -use self::match_::*; -pub use self::service::layer; +/// Instruments service stacks so that requests may be tapped. +pub type Layer = service::Layer>; -#[derive(Clone, Debug, Default)] -pub struct NextId(Arc); +/// A gRPC tap server. +pub type Server = grpc::Server>; -#[derive(Default, Debug)] -pub struct Taps { - by_id: IndexMap, -} +/// A Future that dispatches new tap requests to services and ensures that new +/// services are notified of active tap requests. +pub type Daemon = daemon::Daemon; + +// The maximum number of taps that may be live in the system at once. +const TAP_CAPACITY: usize = 100; + +// The maximum number of registrations that may be queued on the registration +// channel. +const REGISTER_CHANNEL_CAPACITY: usize = 10_000; + +// The number of events that may be buffered for a given response. +const PER_RESPONSE_EVENT_BUFFER_CAPACITY: usize = 400; -#[derive(Debug)] -pub struct Tap { - match_: Match, - tx: futures_mpsc_lossy::Sender, +/// Build the tap subsystem. +pub fn new() -> (Layer, Server, Daemon) { + let (daemon, register, subscribe) = daemon::new(); + let layer = Layer::new(register); + let server = Server::new(subscribe); + (layer, server, daemon) } -/// Indicates the tap is no longer receiving -struct Ended; +/// Inspects a request for a `Stack`. +/// +/// `Stack` target types +pub trait Inspect { + fn src_addr(&self, req: &http::Request) -> Option; + fn src_tls(&self, req: &http::Request) -> tls::Status; -impl Taps { - pub fn insert(&mut self, id: usize, tap: Tap) -> Option { - debug!("insert id={} tap={:?}", id, tap); - self.by_id.insert(id, tap) + fn dst_addr(&self, req: &http::Request) -> Option; + fn dst_labels(&self, req: &http::Request) -> Option<&IndexMap>; + fn dst_tls(&self, req: &http::Request) -> tls::Status; + + fn is_outbound(&self, req: &http::Request) -> bool; + + fn is_inbound(&self, req: &http::Request) -> bool { + !self.is_outbound(req) } - pub fn remove(&mut self, id: usize) -> Option { - debug!("remove id={}", id); - self.by_id.swap_remove(&id) + fn authority(&self, req: &http::Request) -> Option { + req.uri() + .authority_part() + .map(|a| a.as_str().to_owned()) + .or_else(|| { + req.headers() + .get(http::header::HOST) + .and_then(|h| h.to_str().ok()) + .map(|s| s.to_owned()) + }) + } +} + +/// The internal interface used between Layer, Server, and Daemon. +/// +/// These interfaces are provided to decouple the service implementation from any +/// Protobuf or gRPC concerns, hopefully to make this module more testable and +/// easier to change. +/// +/// This module is necessary to seal the traits, which must be public +/// for Layer/Server/Daemon, but need not be implemented outside of the `tap` +/// module. +mod iface { + use bytes::Buf; + use futures::{Future, Stream}; + use http; + use hyper::body::Payload; + use never::Never; + + use proxy::http::HasH2Reason; + + /// Registers a stack to receive taps. + pub trait Register { + type Tap: Tap; + type Taps: Stream; + + fn register(&mut self) -> Self::Taps; + } + + /// Advertises a Tap from a server to stacks. + pub trait Subscribe { + type Future: Future; + + /// Returns a `Future` that succeeds when the tap has been registered. + /// + /// If the tap cannot be registered, a `NoCapacity` error is returned. + fn subscribe(&mut self, tap: T) -> Self::Future; } /// - pub(super) fn inspect(&mut self, ev: &Event) { - if self.by_id.is_empty() { - return; - } - debug!( - "inspect taps={:?} event={:?}", - self.by_id.keys().collect::>(), - ev - ); - - // Iterate through taps by index so that items may be removed. - let mut idx = 0; - while idx < self.by_id.len() { - let (tap_id, inspect) = { - let (id, tap) = self.by_id.get_index(idx).unwrap(); - (*id, tap.inspect(ev)) - }; - - // If the tap is no longer receiving events, remove it. The index is only - // incremented on successs so that, when an item is removed, the swapped item - // is inspected on the next iteration OR, if the last item has been removed, - // `len()` will return `idx` and a subsequent iteration will not occur. - match inspect { - Ok(matched) => { - debug!("inspect tap={} match={}", tap_id, matched); - } - Err(Ended) => { - debug!("ended tap={}", tap_id); - self.by_id.swap_remove_index(idx); - continue; - } - } - - idx += 1; - } + pub trait Tap: Clone { + type TapRequest: TapRequest< + TapPayload = Self::TapRequestPayload, + TapResponse = Self::TapResponse, + TapResponsePayload = Self::TapResponsePayload, + >; + type TapRequestPayload: TapPayload; + type TapResponse: TapResponse; + type TapResponsePayload: TapPayload; + type Future: Future, Error = Never>; + + /// Returns `true` as l + fn can_tap_more(&self) -> bool; + + /// Determines whether a request should be tapped. + fn should_tap( + &self, + req: &http::Request, + inspect: &I, + ) -> bool; + + /// Initiate a tap. + /// + /// If the tap cannot be initialized, for instance because the tap has + /// completed or been canceled, then `None` is returned. + fn tap(&mut self) -> Self::Future; } -} -impl Tap { - pub fn new( - match_: &observe_request::Match, - capacity: usize, - ) -> Result<(Tap, futures_mpsc_lossy::Receiver), InvalidMatch> { - let (tx, rx) = futures_mpsc_lossy::channel(capacity); - let match_ = Match::new(match_)?; - let tap = Tap { match_, tx }; - Ok((tap, rx)) + pub trait TapRequest { + type TapPayload: TapPayload; + type TapResponse: TapResponse; + type TapResponsePayload: TapPayload; + + /// Start tapping a request, obtaining handles to tap its body and response. + fn open( + self, + req: &http::Request, + inspect: &I, + ) -> (Self::TapPayload, Self::TapResponse); } - fn inspect(&self, ev: &Event) -> Result { - if self.match_.matches(ev) { - return self - .tx - .lossy_send(ev.clone()) - .map_err(|_| Ended) - .map(|_| true); - } + pub trait TapPayload { + fn data(&mut self, data: &B); + + fn eos(self, headers: Option<&http::HeaderMap>); - Ok(false) + fn fail(self, error: &E); } -} -impl NextId { - fn next_id(&self) -> usize { - self.0.fetch_add(1, Ordering::Relaxed) + pub trait TapResponse { + type TapPayload: TapPayload; + + /// Record a response and obtain a handle to tap its body. + fn tap(self, rsp: &http::Response) -> Self::TapPayload; + + /// Record a service failure. + fn fail(self, error: &E); } + + #[derive(Debug)] + pub struct NoCapacity; + + impl ::std::fmt::Display for NoCapacity { + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + write!(f, "capacity exhausted") + } + } + + impl ::std::error::Error for NoCapacity {} + } diff --git a/src/tap/service.rs b/src/tap/service.rs index 1e6753a0e10f6..568b2eba35a06 100644 --- a/src/tap/service.rs +++ b/src/tap/service.rs @@ -1,505 +1,362 @@ -use bytes::{Buf, IntoBuf}; -use futures::{Async, Future, Poll}; +use bytes::IntoBuf; +use futures::{future, Async, Future, Poll, Stream}; use http; -use h2; -use hyper::body::Payload; -use std::marker::PhantomData; -use std::sync::{Arc, Mutex}; -use std::time::Instant; -use tokio_timer::clock; - -use super::{event, NextId, Taps}; -use proxy::{ - self, - http::{h1, HasH2Reason}, -}; +use hyper::body::Payload as HyperPayload; + +use super::iface::{Register, Tap, TapPayload, TapRequest, TapResponse}; +use super::Inspect; +use proxy::http::HasH2Reason; use svc; /// A stack module that wraps services to record taps. #[derive(Clone, Debug)] -pub struct Layer { - next_id: NextId, - taps: Arc>, - _p: PhantomData (T, M)>, +pub struct Layer { + registry: R, } /// Wraps services to record taps. #[derive(Clone, Debug)] -pub struct Stack -where - N: svc::Stack, -{ - next_id: NextId, - taps: Arc>, - inner: N, - _p: PhantomData (T)>, +pub struct Stack { + registry: R, + inner: T, } /// A middleware that records HTTP taps. #[derive(Clone, Debug)] -pub struct Service { - endpoint: event::Endpoint, - next_id: NextId, - taps: Arc>, +pub struct Service { + tap_rx: R, + taps: Vec, inner: S, + inspect: I, } -#[derive(Debug, Clone)] -pub struct ResponseFuture { - inner: F, - meta: Option, - taps: Option>>, - request_open_at: Instant, +/// Fetches `TapRequest`s, instruments messages to be tapped, and executes a +/// request. +pub struct ResponseFuture +where + I: Inspect, + T: Tap, + A: HyperPayload, + A::Error: HasH2Reason, + S: svc::Service>>, +{ + state: FutState, } -#[derive(Debug)] -pub struct RequestBody { - inner: B, - meta: Option, - taps: Option>>, - request_open_at: Instant, - byte_count: usize, - frame_count: usize, +enum FutState +where + I: Inspect, + T: Tap, + A: HyperPayload, + A::Error: HasH2Reason, + S: svc::Service>>, +{ + Taps { + taps: future::JoinAll>, + inspect: I, + request: Option>, + service: S, + }, + Call { + taps: Vec, + call: S::Future, + }, } +// A `Payload` instrumented with taps. #[derive(Debug)] -pub struct ResponseBody { +pub struct Payload +where + B: HyperPayload, + B::Error: HasH2Reason, + T: TapPayload, +{ inner: B, - meta: Option, - taps: Option>>, - request_open_at: Instant, - response_open_at: Instant, - response_first_frame_at: Option, - byte_count: usize, - frame_count: usize, + taps: Vec, } // === Layer === -pub fn layer(next_id: NextId, taps: Arc>) -> Layer +impl Layer where - T: Clone + Into, - M: svc::Stack, - M::Value: svc::Service>, Response = http::Response>, - >>>::Error: HasH2Reason, - A: Payload, - B: Payload, + R: Register + Clone, { - Layer { - next_id, - taps, - _p: PhantomData, + pub(super) fn new(registry: R) -> Self { + Self { registry } } } -impl svc::Layer for Layer +impl svc::Layer for Layer where - T: Clone + Into, + T: Inspect + Clone, + R: Register + Clone, M: svc::Stack, { - type Value = as svc::Stack>::Value; + type Value = as svc::Stack>::Value; type Error = M::Error; - type Stack = Stack; + type Stack = Stack; fn bind(&self, inner: M) -> Self::Stack { Stack { - next_id: self.next_id.clone(), - taps: self.taps.clone(), inner, - _p: PhantomData, + registry: self.registry.clone(), } } } // === Stack === -impl svc::Stack for Stack +impl svc::Stack for Stack where - T: Clone + Into, + T: Inspect + Clone, + R: Register + Clone, M: svc::Stack, { - type Value = Service; + type Value = Service; type Error = M::Error; fn make(&self, target: &T) -> Result { let inner = self.inner.make(&target)?; + let tap_rx = self.registry.clone().register(); Ok(Service { - next_id: self.next_id.clone(), - endpoint: target.clone().into(), - taps: self.taps.clone(), inner, + tap_rx, + taps: Vec::default(), + inspect: target.clone(), }) } } // === Service === -impl svc::Service> for Service +impl svc::Service> for Service where - S: svc::Service< - http::Request>, - Response = http::Response, - >, + I: Inspect + Clone, + R: Stream, + T: Tap, + T::TapRequestPayload: Send + 'static, + T::TapResponsePayload: Send + 'static, + S: svc::Service>, Response = http::Response> + + Clone, S::Error: HasH2Reason, - A: Payload, - B: Payload, + A: HyperPayload, + A::Error: HasH2Reason, + B: HyperPayload, + B::Error: HasH2Reason, { - type Response = http::Response>; + type Response = http::Response>; type Error = S::Error; - type Future = ResponseFuture; + type Future = ResponseFuture; fn poll_ready(&mut self) -> Poll<(), Self::Error> { + // Load new taps from the tap server. + while let Ok(Async::Ready(Some(t))) = self.tap_rx.poll() { + self.taps.push(t); + } + // Drop taps that have been canceled or completed. + self.taps.retain(|t| t.can_tap_more()); + self.inner.poll_ready() } fn call(&mut self, req: http::Request) -> Self::Future { - let request_open_at = clock::now(); - - // Only tap a request iff a `Source` is known. - let meta = req.extensions().get::().map(|source| { - let scheme = req.uri().scheme_part().cloned(); - let authority = req - .uri() - .authority_part() - .cloned() - .or_else(|| h1::authority_from_host(&req)); - let path = req.uri().path().into(); - - event::Request { - id: self.next_id.next_id(), - endpoint: self.endpoint.clone(), - source: source.clone(), - method: req.method().clone(), - scheme, - authority, - path, + // Determine which active taps match the request and collect all of the + // futures requesting TapRequests from the tap server. + let mut tap_futs = Vec::new(); + for t in self.taps.iter_mut() { + if t.should_tap(&req, &self.inspect) { + tap_futs.push(t.tap()); } - }); - - let (head, inner) = req.into_parts(); - let mut body = RequestBody { - inner, - meta: meta.clone(), - taps: Some(self.taps.clone()), - request_open_at, - byte_count: 0, - frame_count: 0, - }; - - body.tap_open(); - if body.is_end_stream() { - body.tap_eos(Some(&head.headers)); } - let req = http::Request::from_parts(head, body); ResponseFuture { - inner: self.inner.call(req), - meta, - taps: Some(self.taps.clone()), - request_open_at, + state: FutState::Taps { + taps: future::join_all(tap_futs), + request: Some(req), + service: self.inner.clone(), + inspect: self.inspect.clone(), + }, } } } -impl Future for ResponseFuture +impl Future for ResponseFuture where - B: Payload, - F: Future>, - F::Error: HasH2Reason, + I: Inspect, + T: Tap, + T::TapRequestPayload: Send + 'static, + T::TapResponsePayload: Send + 'static, + S: svc::Service>, Response = http::Response>, + S::Error: HasH2Reason, + A: HyperPayload, + A::Error: HasH2Reason, + B: HyperPayload, + B::Error: HasH2Reason, { - type Item = http::Response>; - type Error = F::Error; + type Item = http::Response>; + type Error = S::Error; fn poll(&mut self) -> Poll { - let rsp = try_ready!(self.inner.poll().map_err(|e| self.tap_err(e))); - let response_open_at = clock::now(); - - let meta = self.meta.take().map(|request| event::Response { - request, - status: rsp.status(), - }); - - let (head, inner) = rsp.into_parts(); - let mut body = ResponseBody { - inner, - meta, - taps: self.taps.take(), - request_open_at: self.request_open_at, - response_open_at, - response_first_frame_at: None, - byte_count: 0, - frame_count: 0, - }; - - body.tap_open(); - if body.is_end_stream() { - trace!("ResponseFuture::poll: eos"); - body.tap_eos(Some(&head.headers)); - } - - let rsp = http::Response::from_parts(head, body); - Ok(rsp.into()) - } -} - -impl ResponseFuture -where - B: Payload, - F: Future>, - F::Error: HasH2Reason, -{ - fn tap_err(&mut self, e: F::Error) -> F::Error { - if let Some(request) = self.meta.take() { - let meta = event::Response { - request, - status: http::StatusCode::INTERNAL_SERVER_ERROR, - }; - - if let Some(t) = self.taps.take() { - let now = clock::now(); - if let Ok(mut taps) = t.lock() { - taps.inspect(&event::Event::StreamResponseFail( - meta, - event::StreamResponseFail { - request_open_at: self.request_open_at, - response_open_at: now, - response_first_frame_at: None, - response_fail_at: now, - error: e.h2_reason().unwrap_or(h2::Reason::INTERNAL_ERROR), - bytes_sent: 0, - }, - )); + // Drive the state machine from FutState::Taps to FutState::Call to Ready. + loop { + self.state = match self.state { + FutState::Taps { + ref mut request, + ref mut service, + ref mut taps, + ref inspect, + } => { + // Get all the tap requests. If there's any sort of error, + // continue without taps. + let mut taps = match taps.poll() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(taps)) => taps, + Err(_) => Vec::new(), + }; + + let req = request.take().expect("request must be set"); + + // Record the request and obtain request-body and response taps. + let mut req_taps = Vec::with_capacity(taps.len()); + let mut rsp_taps = Vec::with_capacity(taps.len()); + for tap in taps.drain(..).filter_map(|t| t) { + let (req_tap, rsp_tap) = tap.open(&req, inspect); + req_taps.push(req_tap); + rsp_taps.push(rsp_tap); + } + + // Install the request taps into the request body. + let req = { + let (head, inner) = req.into_parts(); + let body = Payload { + inner, + taps: req_taps, + }; + http::Request::from_parts(head, body) + }; + + // Call the service with the decorated request and save the + // response taps for when the call completes. + let call = service.call(req); + FutState::Call { + call, + taps: rsp_taps, + } } - } - } - - e - } -} - -// === RequestBody === - -impl> Payload for RequestBody { - type Data = B::Data; - type Error = B::Error; - - fn is_end_stream(&self) -> bool { - self.inner.is_end_stream() - } - - fn poll_data(&mut self) -> Poll, Self::Error> { - let poll_frame = self.inner.poll_data().map_err(|e| self.tap_err(e)); - let frame = try_ready!(poll_frame).map(|f| f.into_buf()); - - if self.meta.is_some() { - if let Some(ref f) = frame { - self.frame_count += 1; - self.byte_count += f.remaining(); - } - } - - if self.inner.is_end_stream() { - self.tap_eos(None); - } - - Ok(Async::Ready(frame)) - } - - fn poll_trailers(&mut self) -> Poll, Self::Error> { - let trailers = try_ready!(self.inner.poll_trailers().map_err(|e| self.tap_err(e))); - self.tap_eos(trailers.as_ref()); - Ok(Async::Ready(trailers)) - } -} - -impl RequestBody { - fn tap_open(&mut self) { - if let Some(meta) = self.meta.as_ref() { - if let Some(taps) = self.taps.as_ref() { - if let Ok(mut taps) = taps.lock() { - taps.inspect(&event::Event::StreamRequestOpen(meta.clone())); - } - } - } - } - - fn tap_eos(&mut self, _: Option<&http::HeaderMap>) { - if let Some(meta) = self.meta.take() { - if let Some(t) = self.taps.take() { - let now = clock::now(); - if let Ok(mut taps) = t.lock() { - taps.inspect(&event::Event::StreamRequestEnd( - meta, - event::StreamRequestEnd { - request_open_at: self.request_open_at, - request_end_at: now, - }, - )); + FutState::Call { + ref mut call, + ref mut taps, + } => { + return match call.poll() { + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(rsp)) => { + // Tap the response headers and use the response + // body taps to decorate the response body. + let taps = taps.drain(..).map(|t| t.tap(&rsp)).collect(); + let (head, inner) = rsp.into_parts(); + let mut body = Payload { inner, taps }; + if body.is_end_stream() { + body.eos(None); + } + Ok(Async::Ready(http::Response::from_parts(head, body))) + } + Err(e) => { + for tap in taps.drain(..) { + tap.fail(&e); + } + Err(e) + } + }; } - } - } - } - - fn tap_err(&mut self, e: h2::Error) -> h2::Error { - if let Some(meta) = self.meta.take() { - if let Some(t) = self.taps.take() { - let now = clock::now(); - if let Ok(mut taps) = t.lock() { - taps.inspect(&event::Event::StreamRequestFail( - meta, - event::StreamRequestFail { - request_open_at: self.request_open_at, - request_fail_at: now, - error: e.reason().unwrap_or(h2::Reason::INTERNAL_ERROR), - }, - )); - } - } + }; } - - e - } -} - -impl Drop for RequestBody { - fn drop(&mut self) { - // TODO this should be recorded as a cancelation if the stream didn't end. - self.tap_eos(None); } } -// === ResponseBody === +// === Payload === -impl Default for ResponseBody { +// `T` need not implement Default. +impl Default for Payload +where + B: HyperPayload + Default, + B::Error: HasH2Reason, + T: TapPayload, +{ fn default() -> Self { - let now = clock::now(); Self { inner: B::default(), - meta: None, - taps: None, - request_open_at: now, - response_open_at: now, - response_first_frame_at: None, - byte_count: 0, - frame_count: 0, + taps: Vec::default(), } } } -impl> Payload for ResponseBody { - type Data = B::Data; +impl HyperPayload for Payload +where + B: HyperPayload, + B::Error: HasH2Reason, + T: TapPayload + Send + 'static, +{ + type Data = ::Buf; type Error = B::Error; fn is_end_stream(&self) -> bool { self.inner.is_end_stream() } - fn poll_data(&mut self) -> Poll, Self::Error> { - trace!("ResponseBody::poll_data"); - let poll_frame = self.inner.poll_data().map_err(|e| self.tap_err(e)); + fn poll_data(&mut self) -> Poll, B::Error> { + let poll_frame = self.inner.poll_data().map_err(|e| self.err(e)); let frame = try_ready!(poll_frame).map(|f| f.into_buf()); - - if self.meta.is_some() { - if self.response_first_frame_at.is_none() { - self.response_first_frame_at = Some(clock::now()); - } - if let Some(ref f) = frame { - self.frame_count += 1; - self.byte_count += f.remaining(); - } - } - - if self.inner.is_end_stream() { - self.tap_eos(None); - } - + self.data(frame.as_ref()); Ok(Async::Ready(frame)) } - fn poll_trailers(&mut self) -> Poll, Self::Error> { - trace!("ResponseBody::poll_trailers"); - let trailers = try_ready!(self.inner.poll_trailers().map_err(|e| self.tap_err(e))); - self.tap_eos(trailers.as_ref()); + fn poll_trailers(&mut self) -> Poll, B::Error> { + let trailers = try_ready!(self.inner.poll_trailers().map_err(|e| self.err(e))); + self.eos(trailers.as_ref()); Ok(Async::Ready(trailers)) } } -impl ResponseBody { - fn tap_open(&mut self) { - if let Some(meta) = self.meta.as_ref() { - if let Some(taps) = self.taps.as_ref() { - if let Ok(mut taps) = taps.lock() { - taps.inspect(&event::Event::StreamResponseOpen( - meta.clone(), - event::StreamResponseOpen { - request_open_at: self.request_open_at, - response_open_at: clock::now(), - }, - )); - } +impl Payload +where + B: HyperPayload, + B::Error: HasH2Reason, + T: TapPayload, +{ + fn data(&mut self, frame: Option<&::Buf>) { + if let Some(ref f) = frame { + for ref mut tap in self.taps.iter_mut() { + tap.data::<::Buf>(f); } } - } - fn tap_eos(&mut self, trailers: Option<&http::HeaderMap>) { - trace!("ResponseBody::tap_eos: trailers={}", trailers.is_some()); - if let Some(meta) = self.meta.take() { - if let Some(t) = self.taps.take() { - let response_end_at = clock::now(); - if let Ok(mut taps) = t.lock() { - taps.inspect(&event::Event::StreamResponseEnd( - meta, - event::StreamResponseEnd { - request_open_at: self.request_open_at, - response_open_at: self.response_open_at, - response_first_frame_at: self - .response_first_frame_at - .unwrap_or(response_end_at), - response_end_at, - grpc_status: trailers.and_then(Self::grpc_status), - bytes_sent: self.byte_count as u64, - }, - )); - } - } + if self.inner.is_end_stream() { + self.eos(None); } } - fn grpc_status(t: &http::HeaderMap) -> Option { - t.get("grpc-status") - .and_then(|v| v.to_str().ok()) - .and_then(|s| s.parse::().ok()) + fn eos(&mut self, trailers: Option<&http::HeaderMap>) { + for tap in self.taps.drain(..) { + tap.eos(trailers); + } } - fn tap_err(&mut self, e: h2::Error) -> h2::Error { - trace!("ResponseBody::tap_err {:?}", e); - - if let Some(meta) = self.meta.take() { - if let Some(t) = self.taps.take() { - if let Ok(mut taps) = t.lock() { - taps.inspect(&event::Event::StreamResponseFail( - meta, - event::StreamResponseFail { - request_open_at: self.request_open_at, - response_open_at: self.response_open_at, - response_first_frame_at: self.response_first_frame_at, - response_fail_at: clock::now(), - error: e.reason().unwrap_or(h2::Reason::INTERNAL_ERROR), - bytes_sent: self.byte_count as u64, - }, - )); - } - } + fn err(&mut self, error: B::Error) -> B::Error { + for tap in self.taps.drain(..) { + tap.fail(&error); } - e + error } } -impl Drop for ResponseBody { +impl Drop for Payload +where + B: HyperPayload, + B::Error: HasH2Reason, + T: TapPayload, +{ fn drop(&mut self) { - trace!("ResponseHandle::drop"); - // TODO this should be recorded as a cancelation if the stream didn't end. - self.tap_eos(None); + self.eos(None); } }