Skip to content

Commit

Permalink
Improve retry for port-forward variant of mirrord agent connection (m…
Browse files Browse the repository at this point in the history
…etalbear-co#2761)

* Simple port-forward retry

* This?

* Ops

* Update

* Fix External proxy trace logs

* Changelog

* Errors

* Ops

* Simpler approach

* Docs
  • Loading branch information
DmitryDodzin authored Sep 18, 2024
1 parent 356e727 commit e713731
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 27 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions changelog.d/2759.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a retry for port-forward agent connection if error was recived via error channel after websocket was established.
2 changes: 1 addition & 1 deletion mirrord/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ fn init_ext_error_handler(commands: &Commands) -> bool {
let _ = miette::set_hook(Box::new(|_| Box::new(JSONReportHandler::new())));
true
}
Commands::InternalProxy => true,
Commands::InternalProxy | Commands::ExternalProxy => true,
_ => false,
}
}
Expand Down
2 changes: 2 additions & 0 deletions mirrord/kube/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ mirrord-protocol = { path = "../protocol" }

actix-codec.workspace = true
async-stream = "0.3"
bytes = "1"
futures.workspace = true
k8s-openapi.workspace = true
kube.workspace = true
Expand All @@ -39,6 +40,7 @@ thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true
tokio-retry = "0.3"
pin-project-lite = "0.2"

[dev-dependencies]
base64.workspace = true
Expand Down
36 changes: 10 additions & 26 deletions mirrord/kube/src/api/kubernetes.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use std::ops::Deref;

use k8s_openapi::{
api::core::v1::{Namespace, Pod},
NamespaceResourceScope,
};
use k8s_openapi::{api::core::v1::Namespace, NamespaceResourceScope};
use kube::{
api::ListParams,
config::{KubeConfigOptions, Kubeconfig},
Expand All @@ -17,7 +14,7 @@ use mirrord_config::{
use mirrord_progress::Progress;
use mirrord_protocol::MeshVendor;
use serde::{Deserialize, Serialize};
use tracing::{debug, info, trace};
use tracing::{debug, info};

use crate::{
api::{
Expand All @@ -33,6 +30,8 @@ use crate::{
error::{KubeApiError, Result},
};

#[cfg(not(feature = "incluster"))]
pub mod portforwarder;
pub mod rollout;
pub mod seeker;

Expand Down Expand Up @@ -99,6 +98,7 @@ impl KubernetesAPI {
) -> Result<tokio::net::TcpStream> {
use std::{net::IpAddr, time::Duration};

use k8s_openapi::api::core::v1::Pod;
use tokio::net::TcpStream;

let pod_api: Api<Pod> = get_k8s_resource_api(&self.client, namespace.as_deref());
Expand All @@ -115,7 +115,7 @@ impl KubernetesAPI {
let ip = pod_ip
.parse::<IpAddr>()
.map_err(|e| KubeApiError::invalid_value(&pod, "status.podIp", e))?;
trace!("connecting to pod {pod_ip}:{agent_port}");
tracing::trace!("connecting to pod {pod_ip}:{agent_port}");

tokio::time::timeout(
Duration::from_secs(self.agent.startup_timeout),
Expand All @@ -128,7 +128,7 @@ impl KubernetesAPI {
Some(namespace) => format!("{pod_name}.{namespace}"),
None => pod_name,
};
trace!("connecting to pod {hostname}:{agent_port}");
tracing::trace!("connecting to pod {hostname}:{agent_port}");

tokio::time::timeout(
Duration::from_secs(self.agent.startup_timeout),
Expand All @@ -147,26 +147,10 @@ impl KubernetesAPI {
&self,
connect_info: AgentKubernetesConnectInfo,
) -> Result<Box<dyn UnpinStream>> {
use tokio_retry::{
strategy::{jitter, ExponentialBackoff},
Retry,
};
let (stream, portforward) =
portforwarder::retry_portforward(&self.client, connect_info).await?;

let pod_api: Api<Pod> =
get_k8s_resource_api(&self.client, connect_info.namespace.as_deref());
let retry_strategy = ExponentialBackoff::from_millis(10).map(jitter).take(3);
let ports = &[connect_info.agent_port];
let mut port_forwarder = Retry::spawn(retry_strategy, || {
trace!("port-forward to pod {:?}", &connect_info);
pod_api.portforward(&connect_info.pod_name, ports)
})
.await?;

let stream = port_forwarder
.take_stream(connect_info.agent_port)
.ok_or(KubeApiError::PortForwardFailed)?;

let stream: Box<dyn UnpinStream> = Box::new(stream);
tokio::spawn(portforward.into_retry_future());

Ok(stream)
}
Expand Down
220 changes: 220 additions & 0 deletions mirrord/kube/src/api/kubernetes/portforwarder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::Duration,
};

use k8s_openapi::api::core::v1::Pod;
use kube::{Api, Client};
use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio_retry::{
strategy::{jitter, ExponentialBackoff},
Retry,
};

use crate::{
api::kubernetes::{get_k8s_resource_api, AgentKubernetesConnectInfo, UnpinStream},
error::{KubeApiError, Result},
};

pin_project! {
/// A wrapper for [`AsyncRead`] & [`AsyncWrite`] that dosn't call shutdown from [`AsyncWrite`] api
/// but it is done manually with `manual_shutdown`
struct ManualShutdown<S> {
#[pin]
inner: S,
}
}

impl<S> ManualShutdown<S> {
fn new(inner: S) -> Self {
ManualShutdown { inner }
}
}

impl<S> ManualShutdown<S>
where
S: AsyncWrite + Unpin,
{
async fn manual_shutdown(&mut self) -> Result<(), std::io::Error> {
self.inner.shutdown().await
}
}

impl<S> AsyncRead for ManualShutdown<S>
where
S: AsyncRead,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<Result<(), std::io::Error>> {
self.project().inner.poll_read(cx, buf)
}
}

impl<S> AsyncWrite for ManualShutdown<S>
where
S: AsyncWrite,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
self.project().inner.poll_write(cx, buf)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
self.project().inner.poll_flush(cx)
}

/// Does nothing, call `manual_shutdown` for actuall inner shutdown call
fn poll_shutdown(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
Poll::Ready(Ok(()))
}
}

type RetryStrategy = dyn Iterator<Item = Duration> + Send;

async fn create_portforward_streams(
pod_api: &Api<Pod>,
connect_info: &AgentKubernetesConnectInfo,
retry_strategy: &mut RetryStrategy,
) -> Result<(
Box<dyn UnpinStream>,
Box<dyn Future<Output = Option<String>> + Unpin + Send>,
)> {
let ports = &[connect_info.agent_port];
let mut port_forwarder = Retry::spawn(retry_strategy, || {
tracing::trace!("port-forward to pod {:?}", &connect_info);
pod_api.portforward(&connect_info.pod_name, ports)
})
.await?;

let stream = Box::new(
port_forwarder
.take_stream(connect_info.agent_port)
.ok_or(KubeApiError::PortForwardFailed)?,
);

let error_future = Box::new(
port_forwarder
.take_error(connect_info.agent_port)
.ok_or(KubeApiError::PortForwardFailed)?,
);

Ok((stream, error_future))
}

pub struct SinglePortForwarder {
connect_info: AgentKubernetesConnectInfo,

retry_strategy: Box<RetryStrategy>,

pod_api: Api<Pod>,

sink: ManualShutdown<Box<dyn UnpinStream>>,

stream: Box<dyn UnpinStream>,

error_future: Box<dyn Future<Output = Option<String>> + Unpin + Send>,
}

impl SinglePortForwarder {
pub async fn connect(
client: &Client,
connect_info: AgentKubernetesConnectInfo,
sink: Box<dyn UnpinStream>,
) -> Result<Self> {
let mut retry_strategy = Box::new(ExponentialBackoff::from_millis(10).map(jitter).take(5));

let pod_api: Api<Pod> = get_k8s_resource_api(client, connect_info.namespace.as_deref());

let (stream, error_future) =
create_portforward_streams(&pod_api, &connect_info, &mut retry_strategy).await?;

let sink = ManualShutdown::new(sink);

Ok(SinglePortForwarder {
connect_info,
retry_strategy,
pod_api,
sink,
stream,
error_future,
})
}

pub async fn into_retry_future(self) {
let SinglePortForwarder {
mut error_future,
mut stream,
mut sink,
retry_strategy,
connect_info,
pod_api,
} = self;

let mut retry_strategy = retry_strategy.peekable();

loop {
tokio::select! {
error = error_future.as_mut() => {

if retry_strategy.peek().is_none() || error.is_none() {
tracing::warn!(?connect_info, "fininsed retry strategy closing connection");

break;
}

if let Some(error) = error {
tracing::warn!(?connect_info, %error, "error while performing port-forward, retrying");
}

match create_portforward_streams(&pod_api, &connect_info, &mut retry_strategy).await {
Ok((next_stream, next_error_future)) => {
let _ = stream.shutdown().await;

stream = next_stream;
error_future = next_error_future;

tracing::trace!(?connect_info, "retry connect successful");
}
Err(error) => {
tracing::error!(?connect_info, %error, "retry connect failed");

break;
}
}
}
copy_result = tokio::io::copy_bidirectional(&mut stream, &mut sink) => {
if let Err(error) = copy_result {
tracing::error!(?connect_info, %error, "unable to copy_bidirectional agent stream to local sink");

break;
}
}
}
}

let _ = sink.manual_shutdown().await;
}
}

pub async fn retry_portforward(
client: &Client,
connect_info: AgentKubernetesConnectInfo,
) -> Result<(Box<dyn UnpinStream>, SinglePortForwarder)> {
let (lhs, rhs) = tokio::io::duplex(4096);

let port_forwarder = SinglePortForwarder::connect(client, connect_info, Box::new(rhs)).await?;

Ok((Box::new(lhs), port_forwarder))
}

0 comments on commit e713731

Please sign in to comment.