Skip to content

Commit

Permalink
Revert inbound policy discovery changes
Browse files Browse the repository at this point in the history
The changes--specifically those in 93b06e6--prevent control plane
boot-strapping: the identity controller is unable to serve requests
because its proxy can't contact the destination pod for policy because
the destination pod doesn't have identity yet.

Ultimately, we probably want to change Linkerd's control plane
deployment topology to avoid these bootstrapping issues. In the
meantime, we'll need to revisit our approach to these changes.

* Revert 89ee318 inbound: Introduce a `policy::LookupAddr` type (#2264)
* Revert 93b06e6 inbound: Remove default policies (#2204)
* Revert c186d88 inbound: connections wait for ServerPolicy discovery (#2186)
  • Loading branch information
olix0r committed Feb 24, 2023
1 parent 7948f13 commit 311a38f
Show file tree
Hide file tree
Showing 32 changed files with 709 additions and 1,023 deletions.
4 changes: 0 additions & 4 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -891,13 +891,11 @@ dependencies = [
"linkerd2-proxy-api",
"once_cell",
"parking_lot",
"pin-project",
"thiserror",
"tokio",
"tokio-test",
"tonic",
"tower",
"tower-test",
"tracing",
]

Expand All @@ -912,7 +910,6 @@ dependencies = [
"http",
"http-body",
"hyper",
"ipnet",
"linkerd-app",
"linkerd-app-admin",
"linkerd-app-core",
Expand All @@ -921,7 +918,6 @@ dependencies = [
"linkerd-metrics",
"linkerd-tracing",
"linkerd2-proxy-api",
"maplit",
"parking_lot",
"regex",
"rustls-pemfile",
Expand Down
12 changes: 3 additions & 9 deletions linkerd/app/admin/src/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,11 @@ struct Rescue;
// === impl Config ===

impl Config {
/// Builds the admin endpoint server.
///
/// This method is asynchronous, as it must discover a `ServerPolicy` for
/// the admin port.
#[allow(clippy::too_many_arguments)]
pub async fn build<B, R>(
pub fn build<B, R>(
self,
bind: B,
policy: &impl inbound::policy::GetPolicy,
policy: impl inbound::policy::GetPolicy,
identity: identity::Server,
report: R,
metrics: inbound::Metrics,
Expand All @@ -93,9 +89,7 @@ impl Config {
let (listen_addr, listen) = bind.bind(&self.server)?;

// Get the policy for the admin server.
let policy = policy
.get_policy(inbound::policy::LookupAddr(listen_addr.into()))
.await?;
let policy = policy.get_policy(OrigDstAddr(listen_addr.into()));

let (ready, latch) = crate::server::Readiness::new();
let admin = crate::server::Admin::new(report, ready, shutdown, trace);
Expand Down
12 changes: 2 additions & 10 deletions linkerd/app/gateway/src/http/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use linkerd_app_core::{
svc::{NewService, ServiceExt},
tls,
trace::test::trace_init,
transport::ServerAddr,
Error, NameAddr,
};
use linkerd_app_inbound::GatewayLoop;
Expand Down Expand Up @@ -53,7 +52,7 @@ async fn upgraded_request_remains_relative_form() {

impl svc::Param<OrigDstAddr> for Target {
fn param(&self) -> OrigDstAddr {
OrigDstAddr(Self::dst_addr())
OrigDstAddr(([10, 10, 10, 10], 4143).into())
}
}

Expand Down Expand Up @@ -133,21 +132,14 @@ async fn upgraded_request_remains_relative_form() {
}]))]),
},
};
let (policy, tx) =
inbound::policy::AllowPolicy::for_test(ServerAddr(Self::dst_addr()), policy);
let (policy, tx) = inbound::policy::AllowPolicy::for_test(self.param(), policy);
tokio::spawn(async move {
tx.closed().await;
});
policy
}
}

impl Target {
fn dst_addr() -> std::net::SocketAddr {
([10, 10, 10, 10], 4143).into()
}
}

let (inner, mut handle) =
mock::pair::<http::Request<http::BoxBody>, http::Response<http::BoxBody>>();
handle.allow(1);
Expand Down
2 changes: 0 additions & 2 deletions linkerd/app/inbound/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ linkerd-tonic-watch = { path = "../../tonic-watch" }
linkerd2-proxy-api = { version = "0.8", features = ["inbound"] }
once_cell = "1"
parking_lot = "0.12"
pin-project = "1"
thiserror = "1"
tokio = { version = "1", features = ["sync"] }
tonic = { version = "0.8", default-features = false }
Expand Down Expand Up @@ -60,4 +59,3 @@ linkerd-meshtls-rustls = { path = "../../meshtls/rustls", features = [
linkerd-tracing = { path = "../../tracing", features = ["ansi"] }
tokio = { version = "1", features = ["full", "macros"] }
tokio-test = "0.4"
tower-test = "0.4"
139 changes: 115 additions & 24 deletions linkerd/app/inbound/src/accept.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
policy::{self, AllowPolicy, GetPolicy},
policy::{AllowPolicy, GetPolicy},
Inbound,
};
use linkerd_app_core::{
Expand All @@ -10,9 +10,6 @@ use linkerd_app_core::{
use std::fmt::Debug;
use tracing::info_span;

#[cfg(test)]
mod tests;

#[derive(Clone, Debug)]
pub(crate) struct Accept {
client_addr: Remote<ClientAddr>,
Expand All @@ -29,7 +26,7 @@ impl<N> Inbound<N> {
pub(crate) fn push_accept<T, I, NSvc, D, DSvc>(
self,
proxy_port: u16,
policies: impl GetPolicy,
policies: impl GetPolicy + Clone + Send + Sync + 'static,
direct: D,
) -> Inbound<svc::ArcNewTcp<T, I>>
where
Expand All @@ -49,24 +46,6 @@ impl<N> Inbound<N> {
{
self.map_stack(|cfg, rt, accept| {
accept
.push_on_service(svc::MapErr::layer_boxed())
.push_map_target(|(policy, t): (AllowPolicy, T)| {
tracing::debug!(policy = ?&*policy.borrow(), "Accepted");
Accept {
client_addr: t.param(),
orig_dst_addr: t.param(),
policy,
}
})
.lift_new_with_target()
.push(policy::Discover::layer_via(policies, |t: &T| {
// For non-direct inbound connections, policies are always
// looked up for the original destination address.
let OrigDstAddr(addr) = t.param();
policy::LookupAddr(addr)
}))
.into_new_service()
.check_new_service::<T, I>()
.push_switch(
// Switch to the `direct` stack when a connection's original destination is the
// proxy's inbound port. Otherwise, check that connections are allowed on the
Expand All @@ -77,7 +56,13 @@ impl<N> Inbound<N> {
return Ok(svc::Either::B(t));
}

Ok(svc::Either::A(t))
let policy = policies.get_policy(addr);
tracing::debug!(policy = ?&*policy.borrow(), "Accepted");
Ok(svc::Either::A(Accept {
client_addr: t.param(),
orig_dst_addr: addr,
policy,
}))
},
direct,
)
Expand Down Expand Up @@ -120,3 +105,109 @@ impl svc::Param<AllowPolicy> for Accept {
self.policy.clone()
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{
policy::{DefaultPolicy, Store},
test_util,
};
use futures::future;
use linkerd_app_core::{
svc::{NewService, ServiceExt},
Error,
};
use linkerd_proxy_server_policy::{Authentication, Authorization, Meta, ServerPolicy};
use std::sync::Arc;

#[tokio::test(flavor = "current_thread")]
async fn default_allow() {
let (io, _) = io::duplex(1);
let policies = Store::for_test(
ServerPolicy {
protocol: linkerd_proxy_server_policy::Protocol::Opaque(Arc::new([
Authorization {
authentication: Authentication::Unauthenticated,
networks: vec![Default::default()],
meta: Arc::new(Meta::Resource {
group: "policy.linkerd.io".into(),
kind: "serverauthorization".into(),
name: "testsaz".into(),
}),
},
])),
meta: Arc::new(Meta::Resource {
group: "policy.linkerd.io".into(),
kind: "server".into(),
name: "testsrv".into(),
}),
},
None,
);
inbound()
.with_stack(new_ok())
.push_accept(999, policies, new_panic("direct stack must not be built"))
.into_inner()
.new_service(Target(1000))
.oneshot(io)
.await
.expect("should succeed");
}

/// Default-deny authorizations are checked by an internal stack.
#[tokio::test(flavor = "current_thread")]
async fn default_deny() {
let policies = Store::for_test(DefaultPolicy::Deny, None);
let (io, _) = io::duplex(1);
inbound()
.with_stack(new_ok())
.push_accept(999, policies, new_panic("direct stack must not be built"))
.into_inner()
.new_service(Target(1000))
.oneshot(io)
.await
.expect("should succeed");
}

#[tokio::test(flavor = "current_thread")]
async fn direct() {
let policies = Store::for_test(DefaultPolicy::Deny, None);
let (io, _) = io::duplex(1);
inbound()
.with_stack(new_panic("detect stack must not be built"))
.push_accept(999, policies, new_ok())
.into_inner()
.new_service(Target(999))
.oneshot(io)
.await
.expect("should succeed");
}

fn inbound() -> Inbound<()> {
Inbound::new(test_util::default_config(), test_util::runtime().0)
}

fn new_panic<T>(msg: &'static str) -> svc::ArcNewTcp<T, io::DuplexStream> {
svc::ArcNewService::new(move |_| panic!("{msg}"))
}

fn new_ok<T>() -> svc::ArcNewTcp<T, io::DuplexStream> {
svc::ArcNewService::new(|_| svc::BoxService::new(svc::mk(|_| future::ok::<(), Error>(()))))
}

#[derive(Clone, Debug)]
struct Target(u16);

impl svc::Param<OrigDstAddr> for Target {
fn param(&self) -> OrigDstAddr {
OrigDstAddr(([192, 0, 2, 2], self.0).into())
}
}

impl svc::Param<Remote<ClientAddr>> for Target {
fn param(&self) -> Remote<ClientAddr> {
Remote(ClientAddr(([192, 0, 2, 3], 54321).into()))
}
}
}
Loading

0 comments on commit 311a38f

Please sign in to comment.