diff --git a/Cargo.lock b/Cargo.lock index 2b9c6fc93688a..373c17be975b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1221,6 +1221,7 @@ dependencies = [ "futures", "http", "hyper", + "itertools", "linkerd-policy-controller-core", "linkerd2-proxy-api", "maplit", @@ -1258,7 +1259,6 @@ dependencies = [ "chrono", "futures", "http", - "k8s-gateway-api", "k8s-openapi", "kube", "kubert", diff --git a/policy-controller/core/src/outbound.rs b/policy-controller/core/src/outbound.rs index 18070e7499df9..4f5cb40c36be0 100644 --- a/policy-controller/core/src/outbound.rs +++ b/policy-controller/core/src/outbound.rs @@ -1,5 +1,6 @@ use crate::routes::{ - GroupKindNamespaceName, HeaderModifierFilter, HostMatch, HttpRouteMatch, RequestRedirectFilter, + FailureInjectorFilter, GroupKindNamespaceName, HeaderModifierFilter, HostMatch, HttpRouteMatch, + RequestRedirectFilter, }; use ahash::AHashMap as HashMap; use anyhow::Result; @@ -26,9 +27,21 @@ pub struct OutboundDiscoverTarget { pub source_namespace: String, } +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum TypedOutboundRoute { + Http(OutboundRoute), +} + +#[derive(Clone, Debug, Default, PartialEq, Eq)] +pub enum OutboundRouteCollection { + #[default] + Empty, + Http(HashMap>), +} + #[derive(Clone, Debug, PartialEq)] pub struct OutboundPolicy { - pub http_routes: HashMap, + pub routes: OutboundRouteCollection, pub authority: String, pub name: String, pub namespace: String, @@ -38,18 +51,18 @@ pub struct OutboundPolicy { } #[derive(Clone, Debug, PartialEq, Eq)] -pub struct HttpRoute { +pub struct OutboundRoute { pub hostnames: Vec, - pub rules: Vec, + pub rules: Vec>, - /// This is required for ordering returned `HttpRoute`s by their creation - /// timestamp. + /// This is required for ordering returned routes + /// by their creation timestamp. pub creation_timestamp: Option>, } #[derive(Clone, Debug, PartialEq, Eq)] -pub struct HttpRouteRule { - pub matches: Vec, +pub struct OutboundRouteRule { + pub matches: Vec, pub backends: Vec, pub request_timeout: Option, pub backend_request_timeout: Option, @@ -98,4 +111,53 @@ pub enum Filter { RequestHeaderModifier(HeaderModifierFilter), ResponseHeaderModifier(HeaderModifierFilter), RequestRedirect(RequestRedirectFilter), + FailureInjector(FailureInjectorFilter), +} + +// === impl TypedOutboundRoute === + +impl From> for TypedOutboundRoute { + fn from(route: OutboundRoute) -> Self { + Self::Http(route) + } +} + +// === impl OutboundRouteCollection === + +impl OutboundRouteCollection { + pub fn is_empty(&self) -> bool { + matches!(self, Self::Empty) + } + + pub fn remove(&mut self, key: &GroupKindNamespaceName) { + match self { + Self::Empty => {} + Self::Http(routes) => { + routes.remove(key); + if routes.is_empty() { + *self = Self::Empty; + } + } + } + } + + pub fn insert>( + &mut self, + key: GroupKindNamespaceName, + route: Route, + ) -> Result> { + let route = route.into(); + + match (self, route) { + (this @ Self::Empty, TypedOutboundRoute::Http(route)) => { + let mut routes = HashMap::default(); + let inserted = routes.insert(key, route).map(Into::into); + *this = Self::Http(routes); + Ok(inserted) + } + (Self::Http(routes), TypedOutboundRoute::Http(route)) => { + Ok(routes.insert(key, route).map(Into::into)) + } + } + } } diff --git a/policy-controller/core/src/routes.rs b/policy-controller/core/src/routes.rs index d3ff09cf3871c..41cbf7ae3bae4 100644 --- a/policy-controller/core/src/routes.rs +++ b/policy-controller/core/src/routes.rs @@ -1,4 +1,5 @@ use anyhow::Result; + pub use http::{ header::{HeaderName, HeaderValue}, uri::Scheme, @@ -8,7 +9,6 @@ use regex::Regex; use std::{borrow::Cow, num::NonZeroU16}; #[derive(Clone, Debug, Hash, PartialEq, Eq)] - pub struct GroupKindName { pub group: Cow<'static, str>, pub kind: Cow<'static, str>, @@ -126,6 +126,19 @@ impl GroupKindName { } } +// === impl HttpRouteMatch === + +impl Default for HttpRouteMatch { + fn default() -> Self { + Self { + method: None, + headers: vec![], + query_params: vec![], + path: Some(PathMatch::Prefix("/".to_string())), + } + } +} + // === impl PathMatch === impl PartialEq for PathMatch { diff --git a/policy-controller/grpc/Cargo.toml b/policy-controller/grpc/Cargo.toml index edc30ded5d9f7..0c625132701f2 100644 --- a/policy-controller/grpc/Cargo.toml +++ b/policy-controller/grpc/Cargo.toml @@ -10,8 +10,9 @@ async-stream = "0.3" async-trait = "0.1" http = "0.2" drain = "0.1" -hyper = { version = "0.14", features = ["http2", "server", "tcp"] } futures = { version = "0.3", default-features = false } +hyper = { version = "0.14", features = ["http2", "server", "tcp"] } +itertools = "0.12" linkerd-policy-controller-core = { path = "../core" } maplit = "1" prost-types = "0.12.6" diff --git a/policy-controller/grpc/src/outbound.rs b/policy-controller/grpc/src/outbound.rs index b7ab0c9436635..41adb37b50656 100644 --- a/policy-controller/grpc/src/outbound.rs +++ b/policy-controller/grpc/src/outbound.rs @@ -1,5 +1,6 @@ use crate::{routes, workload}; use futures::prelude::*; +use itertools::Itertools; use linkerd2_proxy_api::{ self as api, destination, meta::{metadata, Metadata}, @@ -10,10 +11,10 @@ use linkerd2_proxy_api::{ }; use linkerd_policy_controller_core::{ outbound::{ - Backend, DiscoverOutboundPolicy, Filter, HttpRoute, HttpRouteRule, OutboundDiscoverTarget, - OutboundPolicy, OutboundPolicyStream, + Backend, DiscoverOutboundPolicy, Filter, OutboundDiscoverTarget, OutboundPolicy, + OutboundPolicyStream, OutboundRoute, OutboundRouteCollection, OutboundRouteRule, }, - routes::GroupKindNamespaceName, + routes::{GroupKindNamespaceName, HttpRouteMatch}, }; use std::{net::SocketAddr, num::NonZeroU16, str::FromStr, sync::Arc, time}; @@ -190,7 +191,7 @@ fn response_stream(drain: drain::Watch, mut rx: OutboundPolicyStream) -> BoxWatc // If the server starts shutting down, close the stream so that it doesn't hold the // server open. - _ = (&mut shutdown) => { + _ = &mut shutdown => { return; } } @@ -202,80 +203,78 @@ fn to_service(outbound: OutboundPolicy) -> outbound::OutboundPolicy { let backend = default_backend(&outbound); let kind = if outbound.opaque { - linkerd2_proxy_api::outbound::proxy_protocol::Kind::Opaque( - outbound::proxy_protocol::Opaque { - routes: vec![default_outbound_opaq_route(backend)], - }, - ) + outbound::proxy_protocol::Kind::Opaque(outbound::proxy_protocol::Opaque { + routes: vec![default_outbound_opaq_route(backend)], + }) } else { - let mut http_routes = outbound.http_routes.into_iter().collect::>(); - http_routes.sort_by(|(a_name, a_route), (b_name, b_route)| { - let by_ts = match (&a_route.creation_timestamp, &b_route.creation_timestamp) { - (Some(a_ts), Some(b_ts)) => a_ts.cmp(b_ts), - (None, None) => std::cmp::Ordering::Equal, - // Routes with timestamps are preferred over routes without. - (Some(_), None) => return std::cmp::Ordering::Less, - (None, Some(_)) => return std::cmp::Ordering::Greater, - }; - by_ts.then_with(|| a_name.name.cmp(&b_name.name)) + let accrual = outbound.accrual.map(|accrual| outbound::FailureAccrual { + kind: Some(match accrual { + linkerd_policy_controller_core::outbound::FailureAccrual::Consecutive { + max_failures, + backoff, + } => outbound::failure_accrual::Kind::ConsecutiveFailures( + outbound::failure_accrual::ConsecutiveFailures { + max_failures, + backoff: Some(outbound::ExponentialBackoff { + min_backoff: convert_duration("min_backoff", backoff.min_penalty), + max_backoff: convert_duration("max_backoff", backoff.max_penalty), + jitter_ratio: backoff.jitter, + }), + }, + ), + }), }); - let mut http_routes: Vec<_> = http_routes - .into_iter() - .map(|(gknn, route)| convert_outbound_http_route(gknn, route, backend.clone())) - .collect(); - - if http_routes.is_empty() { - http_routes = vec![default_outbound_http_route(backend.clone())]; - } + match outbound.routes { + OutboundRouteCollection::Empty => { + let routes = vec![default_outbound_http_route(backend.clone())]; + + outbound::proxy_protocol::Kind::Detect(outbound::proxy_protocol::Detect { + timeout: Some( + time::Duration::from_secs(10) + .try_into() + .expect("failed to convert detect timeout to protobuf"), + ), + opaque: Some(outbound::proxy_protocol::Opaque { + routes: vec![default_outbound_opaq_route(backend)], + }), + http1: Some(outbound::proxy_protocol::Http1 { + routes: routes.clone(), + failure_accrual: accrual.clone(), + }), + http2: Some(outbound::proxy_protocol::Http2 { + routes, + failure_accrual: accrual, + }), + }) + } + OutboundRouteCollection::Http(routes) => { + let routes = routes + .into_iter() + .sorted_by(timestamp_then_name) + .map(|(gknn, route)| convert_outbound_http_route(gknn, route, backend.clone())) + .collect::>(); - let accrual = - outbound - .accrual - .map(|accrual| linkerd2_proxy_api::outbound::FailureAccrual { - kind: Some(match accrual { - linkerd_policy_controller_core::outbound::FailureAccrual::Consecutive { - max_failures, - backoff, - } => outbound::failure_accrual::Kind::ConsecutiveFailures( - outbound::failure_accrual::ConsecutiveFailures { - max_failures, - backoff: Some(outbound::ExponentialBackoff { - min_backoff: convert_duration( - "min_backoff", - backoff.min_penalty, - ), - max_backoff: convert_duration( - "max_backoff", - backoff.max_penalty, - ), - jitter_ratio: backoff.jitter, - }), - }, - ), + outbound::proxy_protocol::Kind::Detect(outbound::proxy_protocol::Detect { + timeout: Some( + time::Duration::from_secs(10) + .try_into() + .expect("failed to convert detect timeout to protobuf"), + ), + opaque: Some(outbound::proxy_protocol::Opaque { + routes: vec![default_outbound_opaq_route(backend)], }), - }); - - linkerd2_proxy_api::outbound::proxy_protocol::Kind::Detect( - outbound::proxy_protocol::Detect { - timeout: Some( - time::Duration::from_secs(10) - .try_into() - .expect("failed to convert detect timeout to protobuf"), - ), - opaque: Some(outbound::proxy_protocol::Opaque { - routes: vec![default_outbound_opaq_route(backend)], - }), - http1: Some(outbound::proxy_protocol::Http1 { - routes: http_routes.clone(), - failure_accrual: accrual.clone(), - }), - http2: Some(outbound::proxy_protocol::Http2 { - routes: http_routes, - failure_accrual: accrual, - }), - }, - ) + http1: Some(outbound::proxy_protocol::Http1 { + routes: routes.clone(), + failure_accrual: accrual.clone(), + }), + http2: Some(outbound::proxy_protocol::Http2 { + routes, + failure_accrual: accrual, + }), + }) + } + } }; let metadata = Metadata { @@ -295,13 +294,31 @@ fn to_service(outbound: OutboundPolicy) -> outbound::OutboundPolicy { } } +fn timestamp_then_name( + (left_id, left_route): &(GroupKindNamespaceName, OutboundRoute), + (right_id, right_route): &(GroupKindNamespaceName, OutboundRoute), +) -> std::cmp::Ordering { + let by_ts = match ( + &left_route.creation_timestamp, + &right_route.creation_timestamp, + ) { + (Some(left_ts), Some(right_ts)) => left_ts.cmp(right_ts), + (None, None) => std::cmp::Ordering::Equal, + // Routes with timestamps are preferred over routes without. + (Some(_), None) => return std::cmp::Ordering::Less, + (None, Some(_)) => return std::cmp::Ordering::Greater, + }; + + by_ts.then_with(|| left_id.name.cmp(&right_id.name)) +} + fn convert_outbound_http_route( gknn: GroupKindNamespaceName, - HttpRoute { + OutboundRoute { hostnames, rules, creation_timestamp: _, - }: HttpRoute, + }: OutboundRoute, backend: outbound::Backend, ) -> outbound::HttpRoute { let metadata = Some(Metadata { @@ -322,7 +339,7 @@ fn convert_outbound_http_route( let rules = rules .into_iter() .map( - |HttpRouteRule { + |OutboundRouteRule { matches, backends, request_timeout, @@ -356,7 +373,7 @@ fn convert_outbound_http_route( .map(routes::http::convert_match) .collect(), backends: Some(outbound::http_route::Distribution { kind: Some(dist) }), - filters: filters.into_iter().map(convert_filter).collect(), + filters: filters.into_iter().map(convert_to_http_filter).collect(), request_timeout: request_timeout .and_then(|d| convert_duration("request timeout", d)), } @@ -399,7 +416,11 @@ fn convert_http_backend( } Backend::Service(svc) => { if svc.exists { - let filters = svc.filters.into_iter().map(convert_filter).collect(); + let filters = svc + .filters + .into_iter() + .map(convert_to_http_filter) + .collect(); outbound::http_route::WeightedRouteBackend { weight: svc.weight, backend: Some(outbound::http_route::RouteBackend { @@ -595,7 +616,7 @@ fn convert_duration(name: &'static str, duration: time::Duration) -> Option outbound::http_route::Filter { +fn convert_to_http_filter(filter: Filter) -> outbound::http_route::Filter { use outbound::http_route::filter::Kind; outbound::http_route::Filter { @@ -607,6 +628,9 @@ fn convert_filter(filter: Filter) -> outbound::http_route::Filter { Kind::ResponseHeaderModifier(routes::convert_response_header_modifier_filter(f)) } Filter::RequestRedirect(f) => Kind::Redirect(routes::convert_redirect_filter(f)), + Filter::FailureInjector(f) => { + Kind::FailureInjector(routes::http::convert_failure_injector_filter(f)) + } }), } } diff --git a/policy-controller/k8s/index/Cargo.toml b/policy-controller/k8s/index/Cargo.toml index b024213c71c09..24e724e4be3b7 100644 --- a/policy-controller/k8s/index/Cargo.toml +++ b/policy-controller/k8s/index/Cargo.toml @@ -10,7 +10,6 @@ ahash = "0.8" anyhow = "1" futures = { version = "0.3", default-features = false } http = "0.2" -k8s-gateway-api = "0.15" kube = { version = "0.87.1", default-features = false, features = [ "client", "derive", diff --git a/policy-controller/k8s/index/src/inbound/authorization_policy.rs b/policy-controller/k8s/index/src/inbound/authorization_policy.rs index b0ed3b3bdf04a..0598c25fef784 100644 --- a/policy-controller/k8s/index/src/inbound/authorization_policy.rs +++ b/policy-controller/k8s/index/src/inbound/authorization_policy.rs @@ -1,7 +1,7 @@ use anyhow::Result; use linkerd_policy_controller_core::routes::GroupKindName; use linkerd_policy_controller_k8s_api::{ - self as k8s, + self as k8s, gateway as k8s_gateway_api, policy::{LocalTargetRef, NamespacedTargetRef}, ServiceAccount, }; diff --git a/policy-controller/k8s/index/src/inbound/http_route.rs b/policy-controller/k8s/index/src/inbound/http_route.rs index 37b521d1ee300..812e077b7ed46 100644 --- a/policy-controller/k8s/index/src/inbound/http_route.rs +++ b/policy-controller/k8s/index/src/inbound/http_route.rs @@ -1,12 +1,13 @@ -use crate::http_route; use ahash::AHashMap as HashMap; use anyhow::{bail, Error, Result}; use k8s_gateway_api as api; -use linkerd_policy_controller_core::inbound::{Filter, HttpRoute, HttpRouteRule}; -use linkerd_policy_controller_core::routes::{HttpRouteMatch, Method}; -use linkerd_policy_controller_core::POLICY_CONTROLLER_NAME; +use linkerd_policy_controller_core::{ + inbound::{Filter, HttpRoute, HttpRouteRule}, + routes::{HttpRouteMatch, Method}, + POLICY_CONTROLLER_NAME, +}; use linkerd_policy_controller_k8s_api::{ - self as k8s, gateway, + self as k8s, gateway, gateway as k8s_gateway_api, policy::{httproute as policy, Server}, }; use std::fmt; @@ -64,7 +65,7 @@ impl TryFrom for RouteBinding { .hostnames .into_iter() .flatten() - .map(http_route::host_match) + .map(crate::routes::http::host_match) .collect(); let rules = route @@ -110,7 +111,7 @@ impl TryFrom for RouteBinding { .hostnames .into_iter() .flatten() - .map(http_route::host_match) + .map(crate::routes::http::host_match) .collect(); let rules = route @@ -169,18 +170,18 @@ impl RouteBinding { method, }: api::HttpRouteMatch, ) -> Result { - let path = path.map(http_route::path_match).transpose()?; + let path = path.map(crate::routes::http::path_match).transpose()?; let headers = headers .into_iter() .flatten() - .map(http_route::header_match) + .map(crate::routes::http::header_match) .collect::>()?; let query_params = query_params .into_iter() .flatten() - .map(http_route::query_param_match) + .map(crate::routes::http::query_param_match) .collect::>()?; let method = method.as_deref().map(Method::try_from).transpose()?; @@ -218,19 +219,19 @@ impl RouteBinding { api::HttpRouteFilter::RequestHeaderModifier { request_header_modifier, } => { - let filter = http_route::header_modifier(request_header_modifier)?; + let filter = crate::routes::http::header_modifier(request_header_modifier)?; Filter::RequestHeaderModifier(filter) } api::HttpRouteFilter::ResponseHeaderModifier { response_header_modifier, } => { - let filter = http_route::header_modifier(response_header_modifier)?; + let filter = crate::routes::http::header_modifier(response_header_modifier)?; Filter::ResponseHeaderModifier(filter) } api::HttpRouteFilter::RequestRedirect { request_redirect } => { - let filter = http_route::req_redirect(request_redirect)?; + let filter = crate::routes::http::req_redirect(request_redirect)?; Filter::RequestRedirect(filter) } @@ -252,19 +253,19 @@ impl RouteBinding { policy::HttpRouteFilter::RequestHeaderModifier { request_header_modifier, } => { - let filter = http_route::header_modifier(request_header_modifier)?; + let filter = crate::routes::http::header_modifier(request_header_modifier)?; Filter::RequestHeaderModifier(filter) } policy::HttpRouteFilter::ResponseHeaderModifier { response_header_modifier, } => { - let filter = http_route::header_modifier(response_header_modifier)?; + let filter = crate::routes::http::header_modifier(response_header_modifier)?; Filter::ResponseHeaderModifier(filter) } policy::HttpRouteFilter::RequestRedirect { request_redirect } => { - let filter = http_route::req_redirect(request_redirect)?; + let filter = crate::routes::http::req_redirect(request_redirect)?; Filter::RequestRedirect(filter) } }; diff --git a/policy-controller/k8s/index/src/inbound/index.rs b/policy-controller/k8s/index/src/inbound/index.rs index d7771dff6dd55..e48cd651bb9a1 100644 --- a/policy-controller/k8s/index/src/inbound/index.rs +++ b/policy-controller/k8s/index/src/inbound/index.rs @@ -11,8 +11,8 @@ use super::{ server, server_authorization, workload, }; use crate::{ - http_route::{gkn_for_gateway_http_route, gkn_for_linkerd_http_route, gkn_for_resource}, ports::{PortHasher, PortMap, PortSet}, + routes::{ExplicitGKN, ImpliedGKN}, ClusterInfo, DefaultPolicy, }; use ahash::{AHashMap as HashMap, AHashSet as HashSet}; @@ -26,7 +26,8 @@ use linkerd_policy_controller_core::{ IdentityMatch, Ipv4Net, Ipv6Net, NetworkMatch, }; use linkerd_policy_controller_k8s_api::{ - self as k8s, policy::server::Port, policy::server::Selector, ResourceExt, + self as k8s, gateway as k8s_gateway_api, policy::server::Port, policy::server::Selector, + ResourceExt, }; use parking_lot::RwLock; use std::{ @@ -271,7 +272,7 @@ impl Index { { let ns = route.namespace().expect("HttpRoute must have a namespace"); let name = route.name_unchecked(); - let gkn = gkn_for_resource(&route); + let gkn = route.gkn(); let _span = info_span!("apply", %ns, %name).entered(); let route_binding = match route.try_into() { @@ -300,7 +301,7 @@ impl Index { for route in routes.into_iter() { let namespace = route.namespace().expect("HttpRoute must be namespaced"); let name = route.name_unchecked(); - let gkn = gkn_for_resource(&route); + let gkn = route.gkn(); let route_binding = match route.try_into() { Ok(binding) => binding, Err(error) => { @@ -853,7 +854,7 @@ impl kubert::index::IndexNamespacedResource for Index { } fn delete(&mut self, ns: String, name: String) { - let gkn = gkn_for_linkerd_http_route(name); + let gkn = name.gkn::(); self.delete_route(ns, gkn) } @@ -872,7 +873,7 @@ impl kubert::index::IndexNamespacedResource for Inde } fn delete(&mut self, ns: String, name: String) { - let gkn = gkn_for_gateway_http_route(name); + let gkn = name.gkn::(); self.delete_route(ns, gkn) } diff --git a/policy-controller/k8s/index/src/inbound/tests/authorization_policy.rs b/policy-controller/k8s/index/src/inbound/tests/authorization_policy.rs index 42a3554d106ff..00c289ea64995 100644 --- a/policy-controller/k8s/index/src/inbound/tests/authorization_policy.rs +++ b/policy-controller/k8s/index/src/inbound/tests/authorization_policy.rs @@ -1,6 +1,7 @@ use super::*; use k8s_openapi::apimachinery::pkg::apis::meta::v1 as metav1; use linkerd_policy_controller_core::{inbound, routes}; +use linkerd_policy_controller_k8s_api::gateway as k8s_gateway_api; #[test] fn links_authorization_policy_with_mtls_name() { @@ -329,12 +330,12 @@ fn authorization_policy_prevents_index_deletion() { test.index.write().apply(net_authn.clone()); // Now we delete the server, and HTTPRoute. - >::delete( + >::delete( &mut test.index.write(), "ns-0".to_string(), "srv-8080".to_string(), ); - >::delete( + >::delete( &mut test.index.write(), "ns-0".to_string(), "route-foo".to_string(), diff --git a/policy-controller/k8s/index/src/inbound/tests/http_routes.rs b/policy-controller/k8s/index/src/inbound/tests/http_routes.rs index 95bd6c686d66c..f34b92af47244 100644 --- a/policy-controller/k8s/index/src/inbound/tests/http_routes.rs +++ b/policy-controller/k8s/index/src/inbound/tests/http_routes.rs @@ -1,9 +1,10 @@ use super::*; -use crate::http_route::gkn_for_linkerd_http_route; +use crate::routes::ExplicitGKN; use linkerd_policy_controller_core::{ routes::{HttpRouteMatch, Method, PathMatch}, POLICY_CONTROLLER_NAME, }; +use linkerd_policy_controller_k8s_api::policy; const POLICY_API_GROUP: &str = "policy.linkerd.io"; @@ -55,9 +56,9 @@ fn route_attaches_to_server() { assert!(rx .borrow_and_update() .http_routes - .contains_key(&HttpRouteRef::Linkerd(gkn_for_linkerd_http_route( - "route-foo".to_string() - )))); + .contains_key(&HttpRouteRef::Linkerd( + "route-foo".gkn::() + ))); // Create authz policy. test.index.write().apply(mk_authorization_policy( @@ -74,7 +75,7 @@ fn route_attaches_to_server() { assert!(rx.has_changed().unwrap()); assert!(rx.borrow().http_routes - [&HttpRouteRef::Linkerd(gkn_for_linkerd_http_route("route-foo".to_string()))] + [&HttpRouteRef::Linkerd("route-foo".gkn::())] .authorizations .contains_key(&AuthorizationRef::AuthorizationPolicy( "authz-foo".to_string() diff --git a/policy-controller/k8s/index/src/lib.rs b/policy-controller/k8s/index/src/lib.rs index fae5f8132cc2a..6a87177871eca 100644 --- a/policy-controller/k8s/index/src/lib.rs +++ b/policy-controller/k8s/index/src/lib.rs @@ -7,7 +7,7 @@ //! objects. //! - Each `Server` selects over pods in the same namespace. //! - Each `ServerAuthorization` selects over `Server` instances in the same namespace. When a -//! `ServerAuthorization` is updated, we find all of the `Server` instances it selects and update +//! `ServerAuthorization` is updated, we find all the `Server` instances it selects and update //! their authorizations and publishes these updates on the server's broadcast channel. //! //! ```text @@ -25,10 +25,10 @@ mod cluster_info; mod defaults; -pub mod http_route; pub mod inbound; pub mod outbound; pub mod ports; +pub mod routes; pub use cluster_info::ClusterInfo; pub use defaults::DefaultPolicy; diff --git a/policy-controller/k8s/index/src/outbound/index.rs b/policy-controller/k8s/index/src/outbound/index.rs index 0f14bb229a688..e36c65150d495 100644 --- a/policy-controller/k8s/index/src/outbound/index.rs +++ b/policy-controller/k8s/index/src/outbound/index.rs @@ -1,19 +1,21 @@ use crate::{ - http_route::{self, gkn_for_gateway_http_route, gkn_for_linkerd_http_route, HttpRouteResource}, ports::{ports_annotation, PortSet}, + routes::{self, ExplicitGKN, RouteResource}, ClusterInfo, }; use ahash::AHashMap as HashMap; use anyhow::{bail, ensure, Result}; -use k8s_gateway_api::{BackendObjectReference, HttpBackendRef, ParentReference}; use linkerd_policy_controller_core::{ outbound::{ - Backend, Backoff, FailureAccrual, Filter, HttpRoute, HttpRouteRule, OutboundPolicy, - WeightedService, + Backend, Backoff, FailureAccrual, Filter, OutboundPolicy, OutboundRoute, + OutboundRouteCollection, OutboundRouteRule, TypedOutboundRoute, WeightedService, }, - routes::GroupKindNamespaceName, + routes::{GroupKindNamespaceName, HttpRouteMatch}, +}; +use linkerd_policy_controller_k8s_api::{ + gateway::{self as k8s_gateway_api, BackendObjectReference, HttpBackendRef, ParentReference}, + policy as linkerd_k8s_api, ResourceExt, Service, Time, }; -use linkerd_policy_controller_k8s_api::{policy as api, ResourceExt, Service, Time}; use parking_lot::RwLock; use std::{hash::Hash, net::IpAddr, num::NonZeroU16, sync::Arc, time}; use tokio::sync::watch; @@ -50,7 +52,7 @@ struct Namespace { service_port_routes: HashMap, /// Stores the route resources (by service name) that do not /// explicitly target a port. - service_routes: HashMap>, + service_routes: HashMap, namespace: Arc, } @@ -81,17 +83,19 @@ struct ServiceRoutes { struct RoutesWatch { opaque: bool, accrual: Option, - routes: HashMap, + routes: OutboundRouteCollection, watch: watch::Sender, } -impl kubert::index::IndexNamespacedResource for Index { - fn apply(&mut self, route: api::HttpRoute) { - self.apply(HttpRouteResource::Linkerd(route)) +impl kubert::index::IndexNamespacedResource for Index { + fn apply(&mut self, route: linkerd_k8s_api::HttpRoute) { + self.apply(RouteResource::LinkerdHttp(route)) } fn delete(&mut self, namespace: String, name: String) { - let gknn = gkn_for_linkerd_http_route(name).namespaced(namespace); + let gknn = name + .gkn::() + .namespaced(namespace); tracing::debug!(?gknn, "deleting route"); for ns_index in self.namespaces.by_ns.values_mut() { ns_index.delete(&gknn); @@ -101,12 +105,13 @@ impl kubert::index::IndexNamespacedResource for Index { impl kubert::index::IndexNamespacedResource for Index { fn apply(&mut self, route: k8s_gateway_api::HttpRoute) { - self.apply(HttpRouteResource::Gateway(route)) + self.apply(RouteResource::GatewayHttp(route)) } fn delete(&mut self, namespace: String, name: String) { - let gknn = gkn_for_gateway_http_route(name).namespaced(namespace); - tracing::debug!(?gknn, "deleting route"); + let gknn = name + .gkn::() + .namespaced(namespace); for ns_index in self.namespaces.by_ns.values_mut() { ns_index.delete(&gknn); } @@ -209,18 +214,23 @@ impl Index { .by_ns .entry(service_namespace.clone()) .or_insert_with(|| Namespace { + namespace: Arc::new(service_namespace.to_string()), service_routes: Default::default(), service_port_routes: Default::default(), - namespace: Arc::new(service_namespace.to_string()), }); + let key = ServicePort { service: service_name, port: service_port, }; + tracing::debug!(?key, "subscribing to service port"); + let routes = ns.service_routes_or_default(key, &self.namespaces.cluster_info, &self.service_info); + let watch = routes.watch_for_ns_or_default(source_namespace); + Ok(watch.watch.subscribe()) } @@ -228,27 +238,30 @@ impl Index { self.services_by_ip.get(&addr).cloned() } - fn apply(&mut self, route: HttpRouteResource) { + fn apply(&mut self, route: RouteResource) { tracing::debug!(name = route.name(), "indexing route"); for parent_ref in route.inner().parent_refs.iter().flatten() { if !is_parent_service(parent_ref) { continue; } + if !route_accepted_by_service(route.status(), &parent_ref.name) { continue; } + let ns = parent_ref .namespace .clone() .unwrap_or_else(|| route.namespace()); + self.namespaces .by_ns .entry(ns.clone()) .or_insert_with(|| Namespace { + namespace: Arc::new(ns), service_routes: Default::default(), service_port_routes: Default::default(), - namespace: Arc::new(ns), }) .apply( route.clone(), @@ -269,34 +282,40 @@ impl Index { impl Namespace { fn apply( &mut self, - route: HttpRouteResource, + route: RouteResource, parent_ref: &ParentReference, cluster_info: &ClusterInfo, service_info: &HashMap, ) { tracing::debug!(?route); + let outbound_route = match self.convert_route(route.clone(), cluster_info, service_info) { Ok(route) => route, Err(error) => { - tracing::error!(%error, "failed to convert HttpRoute"); + tracing::error!(%error, "failed to convert route"); return; } }; + tracing::debug!(?outbound_route); let port = parent_ref.port.and_then(NonZeroU16::new); + if let Some(port) = port { let service_port = ServicePort { port, service: parent_ref.name.clone(), }; + tracing::debug!( ?service_port, route = route.name(), "inserting route for service" ); + let service_routes = self.service_routes_or_default(service_port, cluster_info, service_info); + service_routes.apply(route.gknn(), outbound_route); } else { // If the parent_ref doesn't include a port, apply this route @@ -308,42 +327,54 @@ impl Namespace { } }, ); + // Also add the route to the list of routes that target the // Service without specifying a port. self.service_routes .entry(parent_ref.name.clone()) .or_default() - .insert(route.gknn(), outbound_route); + .insert(route.gknn(), outbound_route) + .map_err(|error| tracing::warn!(?error)) + .transpose(); } } fn reindex_services(&mut self, service_info: &HashMap) { + let update_service = |backend: &mut Backend| { + if let Backend::Service(svc) = backend { + let service_ref = ServiceRef { + name: svc.name.clone(), + namespace: svc.namespace.clone(), + }; + svc.exists = service_info.contains_key(&service_ref); + } + }; + for routes in self.service_port_routes.values_mut() { - for routes in routes.watches_by_ns.values_mut() { - for route in routes.routes.values_mut() { - for rule in route.rules.iter_mut() { - for backend in rule.backends.iter_mut() { - if let Backend::Service(svc) = backend { - let service_ref = ServiceRef { - name: svc.name.clone(), - namespace: svc.namespace.clone(), - }; - svc.exists = service_info.contains_key(&service_ref); - } - } + for watch in routes.watches_by_ns.values_mut() { + match &mut watch.routes { + OutboundRouteCollection::Empty => {} + OutboundRouteCollection::Http(routes) => { + routes + .values_mut() + .flat_map(|route| route.rules.iter_mut()) + .flat_map(|rule| rule.backends.iter_mut()) + .for_each(update_service); } } - routes.send_if_modified(); + watch.send_if_modified(); } } } fn update_service(&mut self, name: String, service: &ServiceInfo) { tracing::debug!(?name, ?service, "updating service"); + for (svc_port, svc_routes) in self.service_port_routes.iter_mut() { if svc_port.service != name { continue; } + let opaque = service.opaque_ports.contains(&svc_port.port); svc_routes.update_service(opaque, service.accrual); @@ -354,9 +385,11 @@ impl Namespace { for service in self.service_port_routes.values_mut() { service.delete(gknn); } - for routes in self.service_routes.values_mut() { + + self.service_routes.retain(|_, routes| { routes.remove(gknn); - } + !routes.is_empty() + }); } fn service_routes_or_default( @@ -370,57 +403,38 @@ impl Namespace { .or_insert_with(|| { let authority = cluster.service_dns_authority(&self.namespace, &sp.service, sp.port); + let service_ref = ServiceRef { name: sp.service.clone(), namespace: self.namespace.to_string(), }; + let (opaque, accrual) = match service_info.get(&service_ref) { Some(svc) => (svc.opaque_ports.contains(&sp.port), svc.accrual), None => (false, None), }; - // The HttpRoutes which target this Service but don't specify - // a port apply to all ports. Therefore we include them. - let routes = self - .service_routes - .get(&sp.service) - .cloned() - .unwrap_or_default(); + // The routes which target this Service but don't specify + // a port apply to all ports. Therefore, we include them. + let routes = self.service_routes.get(&sp.service).cloned(); let mut service_routes = ServiceRoutes { opaque, accrual, authority, - namespace: self.namespace.clone(), - name: sp.service, port: sp.port, + name: sp.service, + namespace: self.namespace.clone(), watches_by_ns: Default::default(), }; - // Producer routes are routes in the same namespace as their - // parent service. Consumer routes are routes in other - // namespaces. - let (producer_routes, consumer_routes): (Vec<_>, Vec<_>) = routes - .into_iter() - .partition(|(gknn, _route)| *gknn.namespace == *self.namespace); - for (gknn, route) in consumer_routes { - // Consumer routes should only apply to watches from the - // consumer namespace. - let watch = service_routes.watch_for_ns_or_default(gknn.namespace.to_string()); - watch.routes.insert(gknn, route); - } - for (gknn, route) in producer_routes { - // Insert the route into the producer namespace. - let watch = service_routes.watch_for_ns_or_default(gknn.namespace.to_string()); - watch.routes.insert(gknn.clone(), route.clone()); - // Producer routes apply to clients in all namespaces, so - // apply it to watches for all other namespaces too. - for (ns, watch) in service_routes.watches_by_ns.iter_mut() { - if ns != &gknn.namespace { - watch.routes.insert(gknn.clone(), route.clone()); - } + match routes { + None | Some(OutboundRouteCollection::Empty) => {} + Some(OutboundRouteCollection::Http(routes)) => { + service_routes + .insert_producer_and_consumer_routes(routes, self.namespace.as_str()); } - } + }; service_routes }) @@ -428,18 +442,18 @@ impl Namespace { fn convert_route( &self, - route: HttpRouteResource, + route: RouteResource, cluster: &ClusterInfo, service_info: &HashMap, - ) -> Result { + ) -> Result { match route { - HttpRouteResource::Linkerd(route) => { + RouteResource::LinkerdHttp(route) => { let hostnames = route .spec .hostnames .into_iter() .flatten() - .map(http_route::host_match) + .map(routes::http::host_match) .collect(); let rules = route @@ -452,19 +466,19 @@ impl Namespace { let creation_timestamp = route.metadata.creation_timestamp.map(|Time(t)| t); - Ok(HttpRoute { + Ok(TypedOutboundRoute::Http(OutboundRoute { hostnames, rules, creation_timestamp, - }) + })) } - HttpRouteResource::Gateway(route) => { + RouteResource::GatewayHttp(route) => { let hostnames = route .spec .hostnames .into_iter() .flatten() - .map(http_route::host_match) + .map(routes::http::host_match) .collect(); let rules = route @@ -472,31 +486,31 @@ impl Namespace { .rules .into_iter() .flatten() - .map(|r| self.convert_gateway_rule(r, cluster, service_info)) + .map(|r| self.convert_gateway_http_rule(r, cluster, service_info)) .collect::>()?; let creation_timestamp = route.metadata.creation_timestamp.map(|Time(t)| t); - Ok(HttpRoute { + Ok(TypedOutboundRoute::Http(OutboundRoute { hostnames, rules, creation_timestamp, - }) + })) } } } fn convert_linkerd_rule( &self, - rule: api::httproute::HttpRouteRule, + rule: linkerd_k8s_api::httproute::HttpRouteRule, cluster: &ClusterInfo, service_info: &HashMap, - ) -> Result { + ) -> Result> { let matches = rule .matches .into_iter() .flatten() - .map(http_route::try_match) + .map(routes::http::try_match) .collect::>()?; let backends = rule @@ -524,21 +538,20 @@ impl Namespace { Some(timeout) }); - let backend_request_timeout = - rule.timeouts - .as_ref() - .and_then(|timeouts: &api::httproute::HttpRouteTimeouts| { - let timeout = time::Duration::from(timeouts.backend_request?); + let backend_request_timeout = rule.timeouts.as_ref().and_then( + |timeouts: &linkerd_k8s_api::httproute::HttpRouteTimeouts| { + let timeout = time::Duration::from(timeouts.backend_request?); - // zero means "no timeout", per GEP-1742 - if timeout == time::Duration::from_nanos(0) { - return None; - } + // zero means "no timeout", per GEP-1742 + if timeout == time::Duration::from_nanos(0) { + return None; + } - Some(timeout) - }); + Some(timeout) + }, + ); - Ok(HttpRouteRule { + Ok(OutboundRouteRule { matches, backends, request_timeout, @@ -547,17 +560,17 @@ impl Namespace { }) } - fn convert_gateway_rule( + fn convert_gateway_http_rule( &self, rule: k8s_gateway_api::HttpRouteRule, cluster: &ClusterInfo, service_info: &HashMap, - ) -> Result { + ) -> Result> { let matches = rule .matches .into_iter() .flatten() - .map(http_route::try_match) + .map(routes::http::try_match) .collect::>()?; let backends = rule @@ -574,7 +587,7 @@ impl Namespace { .map(convert_gateway_filter) .collect::>()?; - Ok(HttpRouteRule { + Ok(OutboundRouteRule { matches, backends, request_timeout: None, @@ -584,12 +597,13 @@ impl Namespace { } } -fn convert_backend( +fn convert_backend>( ns: &str, - backend: HttpBackendRef, + backend: BackendRef, cluster: &ClusterInfo, services: &HashMap, ) -> Option { + let backend = backend.into(); let filters = backend.filters; let backend = backend.backend_ref?; if !is_backend_service(&backend.inner) { @@ -653,48 +667,51 @@ fn convert_backend( })) } -fn convert_linkerd_filter(filter: api::httproute::HttpRouteFilter) -> Result { +fn convert_linkerd_filter(filter: linkerd_k8s_api::httproute::HttpRouteFilter) -> Result { let filter = match filter { - api::httproute::HttpRouteFilter::RequestHeaderModifier { + linkerd_k8s_api::httproute::HttpRouteFilter::RequestHeaderModifier { request_header_modifier, } => { - let filter = http_route::header_modifier(request_header_modifier)?; + let filter = routes::http::header_modifier(request_header_modifier)?; Filter::RequestHeaderModifier(filter) } - api::httproute::HttpRouteFilter::ResponseHeaderModifier { + linkerd_k8s_api::httproute::HttpRouteFilter::ResponseHeaderModifier { response_header_modifier, } => { - let filter = http_route::header_modifier(response_header_modifier)?; + let filter = routes::http::header_modifier(response_header_modifier)?; Filter::RequestHeaderModifier(filter) } - api::httproute::HttpRouteFilter::RequestRedirect { request_redirect } => { - let filter = http_route::req_redirect(request_redirect)?; + linkerd_k8s_api::httproute::HttpRouteFilter::RequestRedirect { request_redirect } => { + let filter = routes::http::req_redirect(request_redirect)?; Filter::RequestRedirect(filter) } }; Ok(filter) } -fn convert_gateway_filter(filter: k8s_gateway_api::HttpRouteFilter) -> Result { +fn convert_gateway_filter>( + filter: RouteFilter, +) -> Result { + let filter = filter.into(); let filter = match filter { k8s_gateway_api::HttpRouteFilter::RequestHeaderModifier { request_header_modifier, } => { - let filter = http_route::header_modifier(request_header_modifier)?; + let filter = routes::http::header_modifier(request_header_modifier)?; Filter::RequestHeaderModifier(filter) } k8s_gateway_api::HttpRouteFilter::ResponseHeaderModifier { response_header_modifier, } => { - let filter = http_route::header_modifier(response_header_modifier)?; + let filter = routes::http::header_modifier(response_header_modifier)?; Filter::ResponseHeaderModifier(filter) } k8s_gateway_api::HttpRouteFilter::RequestRedirect { request_redirect } => { - let filter = http_route::req_redirect(request_redirect)?; + let filter = routes::http::req_redirect(request_redirect)?; Filter::RequestRedirect(filter) } k8s_gateway_api::HttpRouteFilter::RequestMirror { .. } => { @@ -760,51 +777,52 @@ fn is_service(group: Option<&str>, kind: &str) -> bool { impl ServiceRoutes { fn watch_for_ns_or_default(&mut self, namespace: String) -> &mut RoutesWatch { // The routes from the producer namespace apply to watches in all - // namespaces so we copy them. + // namespaces, so we copy them. let routes = self .watches_by_ns .get(self.namespace.as_ref()) .map(|watch| watch.routes.clone()) .unwrap_or_default(); + self.watches_by_ns.entry(namespace).or_insert_with(|| { let (sender, _) = watch::channel(OutboundPolicy { - http_routes: routes.clone(), - authority: self.authority.clone(), - name: self.name.to_string(), - namespace: self.namespace.to_string(), port: self.port, opaque: self.opaque, accrual: self.accrual, + routes: routes.clone(), + name: self.name.to_string(), + authority: self.authority.clone(), + namespace: self.namespace.to_string(), }); + RoutesWatch { - opaque: self.opaque, - accrual: self.accrual, routes, watch: sender, + opaque: self.opaque, + accrual: self.accrual, } }) } - fn apply(&mut self, gknn: GroupKindNamespaceName, route: HttpRoute) { + fn apply(&mut self, gknn: GroupKindNamespaceName, route: TypedOutboundRoute) { if *gknn.namespace == *self.namespace { // This is a producer namespace route. let watch = self.watch_for_ns_or_default(gknn.namespace.to_string()); - watch.routes.insert(gknn.clone(), route.clone()); - watch.send_if_modified(); + + watch.insert_route(gknn.clone(), route.clone()); + // Producer routes apply to clients in all namespaces, so // apply it to watches for all other namespaces too. - for (ns, watch) in self.watches_by_ns.iter_mut() { + for (ns, ns_watch) in self.watches_by_ns.iter_mut() { if ns != &gknn.namespace { - watch.routes.insert(gknn.clone(), route.clone()); - watch.send_if_modified(); + ns_watch.insert_route(gknn.clone(), route.clone()); } } } else { // This is a consumer namespace route and should only apply to // watches from that namespace. let watch = self.watch_for_ns_or_default(gknn.namespace.to_string()); - watch.routes.insert(gknn, route); - watch.send_if_modified(); + watch.insert_route(gknn, route); } } @@ -820,8 +838,44 @@ impl ServiceRoutes { fn delete(&mut self, gknn: &GroupKindNamespaceName) { for watch in self.watches_by_ns.values_mut() { - watch.routes.remove(gknn); - watch.send_if_modified(); + watch.remove_route(gknn); + } + } + + fn insert_producer_and_consumer_routes>( + &mut self, + routes: HashMap, + namespace: &str, + ) { + // Producer routes are routes in the same namespace as + // their parent service. Consumer routes are routes in + // other namespaces. + let (producer_routes, consumer_routes): (Vec<_>, Vec<_>) = routes + .into_iter() + .partition(|(gknn, _)| gknn.namespace.as_ref() == namespace); + + for (consumer_gknn, consumer_route) in consumer_routes { + // Consumer routes should only apply to watches from the + // consumer namespace. + let consumer_watch = self.watch_for_ns_or_default(consumer_gknn.namespace.to_string()); + + consumer_watch.insert_route(consumer_gknn.clone(), consumer_route.clone()); + } + + for (producer_gknn, producer_route) in producer_routes { + // Insert the route into the producer namespace. + let producer_watch = self.watch_for_ns_or_default(producer_gknn.namespace.to_string()); + + producer_watch.insert_route(producer_gknn.clone(), producer_route.clone()); + + // Producer routes apply to clients in all namespaces, so + // apply it to watches for all other namespaces too. + self.watches_by_ns + .iter_mut() + .filter(|(namespace, _)| namespace.as_str() != producer_gknn.namespace.as_ref()) + .for_each(|(_, watch)| { + watch.insert_route(producer_gknn.clone(), producer_route.clone()) + }); } } } @@ -830,21 +884,43 @@ impl RoutesWatch { fn send_if_modified(&mut self) { self.watch.send_if_modified(|policy| { let mut modified = false; - if self.routes != policy.http_routes { - policy.http_routes = self.routes.clone(); + + if self.routes != policy.routes { + policy.routes = self.routes.clone(); modified = true; } + if self.opaque != policy.opaque { policy.opaque = self.opaque; modified = true; } + if self.accrual != policy.accrual { policy.accrual = self.accrual; modified = true; } + modified }); } + + fn insert_route>( + &mut self, + gknn: GroupKindNamespaceName, + route: Route, + ) { + self.routes + .insert(gknn, route) + .map_err(|error| tracing::warn!(?error)) + .transpose(); + + self.send_if_modified(); + } + + fn remove_route(&mut self, gknn: &GroupKindNamespaceName) { + self.routes.remove(gknn); + self.send_if_modified(); + } } fn parse_accrual_config( @@ -917,7 +993,7 @@ fn parse_duration(s: &str) -> Result { "m" => 1000 * 60, "h" => 1000 * 60 * 60, "d" => 1000 * 60 * 60 * 24, - _ => anyhow::bail!( + _ => bail!( "invalid duration unit {} (expected one of 'ms', 's', 'm', 'h', or 'd')", unit ), diff --git a/policy-controller/k8s/index/src/outbound/tests.rs b/policy-controller/k8s/index/src/outbound/tests.rs index 28f8a096bc0e0..552eaf05d1be4 100644 --- a/policy-controller/k8s/index/src/outbound/tests.rs +++ b/policy-controller/k8s/index/src/outbound/tests.rs @@ -10,7 +10,7 @@ use linkerd_policy_controller_core::IpNet; use linkerd_policy_controller_k8s_api::{self as k8s}; use tokio::time; -mod http_routes; +mod routes; struct TestConfig { index: SharedIndex, diff --git a/policy-controller/k8s/index/src/outbound/tests/routes.rs b/policy-controller/k8s/index/src/outbound/tests/routes.rs new file mode 100644 index 0000000000000..8074a0f70766b --- /dev/null +++ b/policy-controller/k8s/index/src/outbound/tests/routes.rs @@ -0,0 +1 @@ +mod http; diff --git a/policy-controller/k8s/index/src/outbound/tests/http_routes.rs b/policy-controller/k8s/index/src/outbound/tests/routes/http.rs similarity index 83% rename from policy-controller/k8s/index/src/outbound/tests/http_routes.rs rename to policy-controller/k8s/index/src/outbound/tests/routes/http.rs index bc9cd76bf46f8..6ad80286f164c 100644 --- a/policy-controller/k8s/index/src/outbound/tests/http_routes.rs +++ b/policy-controller/k8s/index/src/outbound/tests/routes/http.rs @@ -1,21 +1,23 @@ -use k8s_gateway_api::BackendRef; use kube::Resource; use linkerd_policy_controller_core::{ - outbound::{Backend, WeightedService}, + outbound::{Backend, OutboundRouteCollection, WeightedService}, routes::GroupKindNamespaceName, POLICY_CONTROLLER_NAME, }; +use linkerd_policy_controller_k8s_api::gateway::BackendRef; use tracing::Level; -use super::*; +use super::super::*; #[test] fn backend_service() { tracing_subscriber::fmt() .with_max_level(Level::TRACE) - .init(); + .try_init() + .ok(); let test = TestConfig::default(); + // Create apex service. let apex = mk_service("ns", "apex", 8080); test.index.write().apply(apex); @@ -37,8 +39,12 @@ fn backend_service() { { let policy = rx.borrow_and_update(); - let backend = policy - .http_routes + let backend = Some(&policy.routes) + .map(|routes| match routes { + OutboundRouteCollection::Http(routes) => routes, + _ => panic!("expected http route collection"), + }) + .unwrap() .get(&GroupKindNamespaceName { group: k8s::policy::HttpRoute::group(&()), kind: k8s::policy::HttpRoute::kind(&()), @@ -52,10 +58,13 @@ fn backend_service() { .backends .first() .expect("backend should exist"); + let exists = match backend { + Backend::Invalid { .. } => &false, Backend::Service(WeightedService { exists, .. }) => exists, - _ => panic!("backend should be a service"), + _ => panic!("backend should be a service, but got {backend:?}"), }; + // Backend should not exist. assert!(!exists); } @@ -67,8 +76,12 @@ fn backend_service() { { let policy = rx.borrow_and_update(); - let backend = policy - .http_routes + let backend = Some(&policy.routes) + .map(|routes| match routes { + OutboundRouteCollection::Http(routes) => routes, + _ => panic!("expected http route collection"), + }) + .unwrap() .get(&GroupKindNamespaceName { group: k8s::policy::HttpRoute::group(&()), kind: k8s::policy::HttpRoute::kind(&()), @@ -82,10 +95,12 @@ fn backend_service() { .backends .first() .expect("backend should exist"); + let exists = match backend { Backend::Service(WeightedService { exists, .. }) => exists, - _ => panic!("backend should be a service"), + backend => panic!("backend should be a service, but got {:?}", backend), }; + // Backend should exist. assert!(exists); } @@ -146,8 +161,8 @@ fn mk_route( timeouts: None, }]), }, - status: Some(k8s::policy::httproute::HttpRouteStatus { - inner: k8s::gateway::RouteStatus { + status: Some(HttpRouteStatus { + inner: RouteStatus { parents: vec![k8s::gateway::RouteParentStatus { parent_ref: ParentReference { group: Some("core".to_string()), @@ -159,7 +174,7 @@ fn mk_route( }, controller_name: POLICY_CONTROLLER_NAME.to_string(), conditions: vec![k8s::Condition { - last_transition_time: k8s::Time(chrono::DateTime::::MIN_UTC), + last_transition_time: Time(chrono::DateTime::::MIN_UTC), message: "".to_string(), observed_generation: None, reason: "Accepted".to_string(), diff --git a/policy-controller/k8s/index/src/routes.rs b/policy-controller/k8s/index/src/routes.rs new file mode 100644 index 0000000000000..1c9d07ad7227b --- /dev/null +++ b/policy-controller/k8s/index/src/routes.rs @@ -0,0 +1,82 @@ +use linkerd_policy_controller_core::routes::{GroupKindName, GroupKindNamespaceName}; +use linkerd_policy_controller_k8s_api::{gateway as api, policy, Resource, ResourceExt}; + +pub mod http; + +#[derive(Debug, Clone)] +pub(crate) enum RouteResource { + LinkerdHttp(policy::HttpRoute), + GatewayHttp(api::HttpRoute), +} + +impl RouteResource { + pub(crate) fn name(&self) -> String { + match self { + RouteResource::LinkerdHttp(route) => route.name_unchecked(), + RouteResource::GatewayHttp(route) => route.name_unchecked(), + } + } + + pub(crate) fn namespace(&self) -> String { + match self { + RouteResource::LinkerdHttp(route) => { + route.namespace().expect("HttpRoute must have a namespace") + } + RouteResource::GatewayHttp(route) => { + route.namespace().expect("HttpRoute must have a namespace") + } + } + } + + pub(crate) fn inner(&self) -> &api::CommonRouteSpec { + match self { + RouteResource::LinkerdHttp(route) => &route.spec.inner, + RouteResource::GatewayHttp(route) => &route.spec.inner, + } + } + + pub(crate) fn status(&self) -> Option<&api::RouteStatus> { + match self { + RouteResource::LinkerdHttp(route) => route.status.as_ref().map(|status| &status.inner), + RouteResource::GatewayHttp(route) => route.status.as_ref().map(|status| &status.inner), + } + } + + pub(crate) fn gknn(&self) -> GroupKindNamespaceName { + match self { + RouteResource::LinkerdHttp(route) => route + .gkn() + .namespaced(route.namespace().expect("Route must have namespace")), + RouteResource::GatewayHttp(route) => route + .gkn() + .namespaced(route.namespace().expect("Route must have namespace")), + } + } +} + +pub trait ExplicitGKN { + fn gkn>(&self) -> GroupKindName; +} +pub trait ImpliedGKN { + fn gkn(&self) -> GroupKindName; +} + +impl> ImpliedGKN for R { + fn gkn(&self) -> GroupKindName { + let (kind, group, name) = ( + Self::kind(&()), + Self::group(&()), + self.name_unchecked().into(), + ); + + GroupKindName { group, kind, name } + } +} + +impl ExplicitGKN for str { + fn gkn>(&self) -> GroupKindName { + let (kind, group, name) = (R::kind(&()), R::group(&()), self.to_string().into()); + + GroupKindName { group, kind, name } + } +} diff --git a/policy-controller/k8s/index/src/http_route.rs b/policy-controller/k8s/index/src/routes/http.rs similarity index 58% rename from policy-controller/k8s/index/src/http_route.rs rename to policy-controller/k8s/index/src/routes/http.rs index 199d6e5617a16..d01839f6f566a 100644 --- a/policy-controller/k8s/index/src/http_route.rs +++ b/policy-controller/k8s/index/src/routes/http.rs @@ -1,59 +1,8 @@ use anyhow::{anyhow, bail, Result}; -use k8s_gateway_api as api; -use kube::{Resource, ResourceExt}; -use linkerd_policy_controller_core::routes::{self, GroupKindName, GroupKindNamespaceName}; -use linkerd_policy_controller_k8s_api::policy; +use linkerd_policy_controller_core::routes; +use linkerd_policy_controller_k8s_api::gateway as api; use std::num::NonZeroU16; -#[derive(Debug, Clone)] -pub(crate) enum HttpRouteResource { - Linkerd(linkerd_policy_controller_k8s_api::policy::HttpRoute), - Gateway(api::HttpRoute), -} - -impl HttpRouteResource { - pub(crate) fn name(&self) -> String { - match self { - HttpRouteResource::Linkerd(route) => route.name_unchecked(), - HttpRouteResource::Gateway(route) => route.name_unchecked(), - } - } - - pub(crate) fn namespace(&self) -> String { - match self { - HttpRouteResource::Linkerd(route) => { - route.namespace().expect("HttpRoute must have a namespace") - } - HttpRouteResource::Gateway(route) => { - route.namespace().expect("HttpRoute must have a namespace") - } - } - } - - pub(crate) fn inner(&self) -> &api::CommonRouteSpec { - match self { - HttpRouteResource::Linkerd(route) => &route.spec.inner, - HttpRouteResource::Gateway(route) => &route.spec.inner, - } - } - - pub(crate) fn status(&self) -> Option<&api::RouteStatus> { - match self { - HttpRouteResource::Linkerd(route) => route.status.as_ref().map(|status| &status.inner), - HttpRouteResource::Gateway(route) => route.status.as_ref().map(|status| &status.inner), - } - } - - pub(crate) fn gknn(&self) -> GroupKindNamespaceName { - match self { - HttpRouteResource::Linkerd(route) => gkn_for_resource(route) - .namespaced(route.namespace().expect("Route must have namespace")), - HttpRouteResource::Gateway(route) => gkn_for_resource(route) - .namespaced(route.namespace().expect("Route must have namespace")), - } - } -} - pub fn try_match( api::HttpRouteMatch { path, @@ -91,18 +40,18 @@ pub fn try_match( pub fn path_match(path_match: api::HttpPathMatch) -> Result { match path_match { - api::HttpPathMatch::Exact { value } | api::HttpPathMatch::PathPrefix { value } - if !value.starts_with('/') => + api::HttpPathMatch::Exact { value } | api::HttpPathMatch::PathPrefix { value } + if !value.starts_with('/') => { Err(anyhow!("HttpPathMatch paths must be absolute (begin with `/`); {value:?} is not an absolute path")) } - api::HttpPathMatch::Exact { value } => Ok(routes::PathMatch::Exact(value)), - api::HttpPathMatch::PathPrefix { value } => Ok(routes::PathMatch::Prefix(value)), - api::HttpPathMatch::RegularExpression { value } => value - .parse() - .map(routes::PathMatch::Regex) - .map_err(Into::into), - } + api::HttpPathMatch::Exact { value } => Ok(routes::PathMatch::Exact(value)), + api::HttpPathMatch::PathPrefix { value } => Ok(routes::PathMatch::Prefix(value)), + api::HttpPathMatch::RegularExpression { value } => value + .parse() + .map(routes::PathMatch::Regex) + .map_err(Into::into), + } } pub fn host_match(hostname: api::Hostname) -> routes::HostMatch { @@ -201,29 +150,3 @@ fn path_modifier(path_modifier: api::HttpPathModifier) -> Result Ok(routes::PathModifier::Prefix(replace_prefix_match)), } } - -pub(crate) fn gkn_for_resource(t: &T) -> GroupKindName -where - T: kube::Resource, -{ - let kind = T::kind(&()); - let group = T::group(&()); - let name = t.name_unchecked().into(); - GroupKindName { group, kind, name } -} - -pub(crate) fn gkn_for_linkerd_http_route(name: String) -> GroupKindName { - GroupKindName { - group: policy::HttpRoute::group(&()), - kind: policy::HttpRoute::kind(&()), - name: name.into(), - } -} - -pub(crate) fn gkn_for_gateway_http_route(name: String) -> GroupKindName { - GroupKindName { - group: api::HttpRoute::group(&()), - kind: api::HttpRoute::kind(&()), - name: name.into(), - } -} diff --git a/policy-controller/src/admission.rs b/policy-controller/src/admission.rs index 76fa39e1e4944..ec7e0da047346 100644 --- a/policy-controller/src/admission.rs +++ b/policy-controller/src/admission.rs @@ -439,7 +439,6 @@ impl Validate for Admission { } } -use index::http_route; fn validate_match( httproute::HttpRouteMatch { path, @@ -448,18 +447,18 @@ fn validate_match( method, }: httproute::HttpRouteMatch, ) -> Result<()> { - let _ = path.map(http_route::path_match).transpose()?; + let _ = path.map(index::routes::http::path_match).transpose()?; let _ = method .as_deref() .map(core::routes::Method::try_from) .transpose()?; for q in query_params.into_iter().flatten() { - http_route::query_param_match(q)?; + index::routes::http::query_param_match(q)?; } for h in headers.into_iter().flatten() { - http_route::header_match(h)?; + index::routes::http::header_match(h)?; } Ok(()) @@ -472,12 +471,12 @@ impl Validate for Admission { match filter { httproute::HttpRouteFilter::RequestHeaderModifier { request_header_modifier, - } => http_route::header_modifier(request_header_modifier).map(|_| ()), + } => index::routes::http::header_modifier(request_header_modifier).map(|_| ()), httproute::HttpRouteFilter::ResponseHeaderModifier { response_header_modifier, - } => http_route::header_modifier(response_header_modifier).map(|_| ()), + } => index::routes::http::header_modifier(response_header_modifier).map(|_| ()), httproute::HttpRouteFilter::RequestRedirect { request_redirect } => { - http_route::req_redirect(request_redirect).map(|_| ()) + index::routes::http::req_redirect(request_redirect).map(|_| ()) } } } @@ -545,12 +544,12 @@ impl Validate for Admission { match filter { k8s_gateway_api::HttpRouteFilter::RequestHeaderModifier { request_header_modifier, - } => http_route::header_modifier(request_header_modifier).map(|_| ()), + } => index::routes::http::header_modifier(request_header_modifier).map(|_| ()), k8s_gateway_api::HttpRouteFilter::ResponseHeaderModifier { response_header_modifier, - } => http_route::header_modifier(response_header_modifier).map(|_| ()), + } => index::routes::http::header_modifier(response_header_modifier).map(|_| ()), k8s_gateway_api::HttpRouteFilter::RequestRedirect { request_redirect } => { - http_route::req_redirect(request_redirect).map(|_| ()) + index::routes::http::req_redirect(request_redirect).map(|_| ()) } k8s_gateway_api::HttpRouteFilter::RequestMirror { .. } => Ok(()), k8s_gateway_api::HttpRouteFilter::URLRewrite { .. } => Ok(()),