Skip to content

Commit

Permalink
add test-support policy controller
Browse files Browse the repository at this point in the history
This is necessary now that a policy controller is always required.
Otherwise, the tests all fail. :)
  • Loading branch information
hawkw committed Dec 21, 2022
1 parent 9b2e7da commit 24d0e79
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 0 deletions.
4 changes: 4 additions & 0 deletions linkerd/app/integration/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ pub fn identity() -> identity::Controller {
identity::Controller::default()
}

pub fn policy() -> policy::Controller {
policy::Controller::default()
}

pub type Labels = HashMap<String, String>;

pub type DstReceiver = UnboundedReceiverStream<Result<pb::Update, grpc::Status>>;
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/integration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ pub mod client;
pub mod controller;
pub mod identity;
pub mod metrics;
pub mod policy;
pub mod proxy;
pub mod server;
pub mod tap;
Expand Down
148 changes: 148 additions & 0 deletions linkerd/app/integration/src/policy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use super::*;
use futures::stream;
use parking_lot::Mutex;
use std::{collections::VecDeque, sync::Arc};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic as grpc;

pub use linkerd2_proxy_api::inbound;

#[derive(Clone, Debug, Default)]
pub struct Controller {
inbound_calls: Arc<Mutex<VecDeque<(inbound::PortSpec, InboundReceiver)>>>,
inbound_default: Option<inbound::Server>,
expected_workload: Option<Arc<String>>,
}

#[derive(Debug, Clone)]
pub struct InboundSender(mpsc::UnboundedSender<Result<inbound::Server, grpc::Status>>);

type InboundReceiver = UnboundedReceiverStream<Result<inbound::Server, grpc::Status>>;

impl Controller {
pub fn new() -> Self {
Self::default()
}

pub fn expect_workload(self, workload: String) -> Self {
Self {
expected_workload: Some(workload.into()),
..self
}
}

pub fn inbound_tx(&self, port: u16) -> InboundSender {
let spec = inbound::PortSpec {
workload: String::new(),
port: port as u32,
};

let (tx, rx) = mpsc::unbounded_channel();
let rx = UnboundedReceiverStream::new(rx);
self.inbound_calls.lock().push_back((spec, rx));
InboundSender(tx)
}

pub fn with_inbound_default(mut self, default: inbound::Server) -> Self {
self.inbound_default = Some(default);
self
}

pub async fn run(self) -> controller::Listening {
controller::run(
inbound::inbound_server_policies_server::InboundServerPoliciesServer::new(self),
"support policy controller",
None,
)
.await
}
}

impl InboundSender {
pub fn send(&self, up: inbound::Server) {
self.0.send(Ok(up)).expect("send inbound Server update")
}

pub fn send_err(&self, err: grpc::Status) {
self.0.send(Err(err)).expect("send inbound error")
}
}

#[tonic::async_trait]
impl inbound::inbound_server_policies_server::InboundServerPolicies for Controller {
type WatchPortStream =
Pin<Box<dyn Stream<Item = Result<inbound::Server, grpc::Status>> + Send + Sync + 'static>>;

async fn get_port(
&self,
_req: grpc::Request<inbound::PortSpec>,
) -> Result<grpc::Response<inbound::Server>, grpc::Status> {
Err(grpc::Status::new(
grpc::Code::Unimplemented,
"the proxy should only make `WatchPort` RPCs to the inbound policy \
service, so `GetPort` is not implemented by the test controller",
))
}
async fn watch_port(
&self,
req: grpc::Request<inbound::PortSpec>,
) -> Result<grpc::Response<Self::WatchPortStream>, grpc::Status> {
let req = req.into_inner();
let _span = tracing::info_span!(
"InboundPolicies::watch_port",
req.port,
%req.workload,
)
.entered();
tracing::debug!("received request");

if let Some(ref expected_workload) = self.expected_workload {
if req.workload != **expected_workload {
tracing::warn!(
actual = ?req.workload,
expected = ?expected_workload,
"request workload does not match"
);
return Err(grpc_unexpected_request());
}
}

let mut calls = self.inbound_calls.lock();
if let Some((spec, policy)) = calls.pop_front() {
tracing::debug!(?spec, "checking next call");
if spec.port == req.port {
tracing::info!(?spec, ?policy, "found request");
return Ok(grpc::Response::new(Box::pin(policy)));
}

tracing::warn!(?spec, ?policy, "request does not match");
calls.push_front((spec, policy));

if let Some(default) = self.inbound_default.clone() {
tracing::info!("using default inbound policy");
let stream =
Box::pin(stream::once(async move { Ok(default) }).chain(stream::pending()));
return Ok(grpc::Response::new(stream));
}

return Err(grpc_unexpected_request());
}

Err(grpc_no_results())
}
}

fn grpc_no_results() -> grpc::Status {
grpc::Status::new(
grpc::Code::Unavailable,
"unit test policy controller has no results",
)
}

fn grpc_unexpected_request() -> grpc::Status {
grpc::Status::new(
grpc::Code::Unavailable,
"unit test policy controller expected different request",
)
}
39 changes: 39 additions & 0 deletions linkerd/app/integration/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub fn new() -> Proxy {
pub struct Proxy {
controller: Option<controller::Listening>,
identity: Option<controller::Listening>,
policy: Option<controller::Listening>,

/// Inbound/outbound addresses helpful for mocking connections that do not
/// implement `server::Listener`.
Expand Down Expand Up @@ -44,6 +45,7 @@ pub struct Listening {

controller: controller::Listening,
identity: controller::Listening,
policy: controller::Listening,

shutdown: Shutdown,
terminated: oneshot::Receiver<()>,
Expand Down Expand Up @@ -122,6 +124,15 @@ impl Proxy {
self
}

/// Pass a customized support policy controller for this proxy to use.
///
/// If not called, this proxy will be configured with a default policy
/// controller.
pub fn policy(mut self, p: controller::Listening) -> Self {
self.policy = Some(p);
self
}

pub fn disable_identity(mut self) -> Self {
self.identity = None;
self
Expand Down Expand Up @@ -223,6 +234,7 @@ impl Listening {
thread,
shutdown,
terminated,
policy,
..
} = self;

Expand All @@ -249,9 +261,14 @@ impl Listening {
}
.instrument(tracing::info_span!("inbound"));

let policy_controller = policy
.join()
.instrument(tracing::info_span!("policy_controller"));

tokio::join! {
inbound,
outbound,
policy_controller,
identity.join(),
controller.join(),
};
Expand All @@ -267,6 +284,16 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening {
controller::new().run().await
};

let policy = match proxy.policy {
Some(policy) => policy,
None => {
controller::policy()
.with_inbound_default(Default::default())
.run()
.await
}
};

let identity = if let Some(identity) = proxy.identity {
identity
} else {
Expand Down Expand Up @@ -295,6 +322,13 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening {
"LINKERD2_PROXY_DESTINATION_SVC_ADDR",
controller.addr.to_string(),
);

env.put("LINKERD2_PROXY_POLICY_SVC_ADDR", policy.addr.to_string());
env.put(
"LINKERD2_PROXY_POLICY_SVC_NAME",
"policy.linkerd.serviceaccount.identity.linkerd.cluster.local".to_string(),
);

if random_ports {
env.put(app::env::ENV_OUTBOUND_LISTEN_ADDR, "127.0.0.1:0".to_owned());
}
Expand Down Expand Up @@ -346,6 +380,10 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening {
);
}

if !env.contains_key(app::env::ENV_POLICY_WORKLOAD) {
env.put(app::env::ENV_POLICY_WORKLOAD, "test:test".into());
}

let config = app::env::parse_config(&env).unwrap();

let dispatch = tracing::Dispatch::default();
Expand Down Expand Up @@ -466,6 +504,7 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening {

controller,
identity,
policy,

shutdown: tx,
terminated: term_rx,
Expand Down

0 comments on commit 24d0e79

Please sign in to comment.