diff --git a/Cargo.lock b/Cargo.lock index 7a3ecfed409..04a4fb7c7c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3945,6 +3945,7 @@ dependencies = [ "actix-codec", "async-stream", "base64 0.22.1", + "bytes", "futures", "http-body 1.0.1", "hyper 1.4.1", @@ -3953,6 +3954,7 @@ dependencies = [ "mirrord-config", "mirrord-progress", "mirrord-protocol", + "pin-project-lite", "rand", "regex", "rstest 0.22.0", diff --git a/changelog.d/2759.fixed.md b/changelog.d/2759.fixed.md new file mode 100644 index 00000000000..b4b10c1a92c --- /dev/null +++ b/changelog.d/2759.fixed.md @@ -0,0 +1 @@ +Add a retry for port-forward agent connection if error was recived via error channel after websocket was established. diff --git a/mirrord/cli/src/main.rs b/mirrord/cli/src/main.rs index 2f56a646d0b..295fc118032 100644 --- a/mirrord/cli/src/main.rs +++ b/mirrord/cli/src/main.rs @@ -608,7 +608,7 @@ fn init_ext_error_handler(commands: &Commands) -> bool { let _ = miette::set_hook(Box::new(|_| Box::new(JSONReportHandler::new()))); true } - Commands::InternalProxy => true, + Commands::InternalProxy | Commands::ExternalProxy => true, _ => false, } } diff --git a/mirrord/kube/Cargo.toml b/mirrord/kube/Cargo.toml index 38a7da713d8..80ea4fd5efd 100644 --- a/mirrord/kube/Cargo.toml +++ b/mirrord/kube/Cargo.toml @@ -27,6 +27,7 @@ mirrord-protocol = { path = "../protocol" } actix-codec.workspace = true async-stream = "0.3" +bytes = "1" futures.workspace = true k8s-openapi.workspace = true kube.workspace = true @@ -39,6 +40,7 @@ thiserror.workspace = true tokio.workspace = true tracing.workspace = true tokio-retry = "0.3" +pin-project-lite = "0.2" [dev-dependencies] base64.workspace = true diff --git a/mirrord/kube/src/api/kubernetes.rs b/mirrord/kube/src/api/kubernetes.rs index 18e8c1c4d0a..9b7f2a428e1 100644 --- a/mirrord/kube/src/api/kubernetes.rs +++ b/mirrord/kube/src/api/kubernetes.rs @@ -1,9 +1,6 @@ use std::ops::Deref; -use k8s_openapi::{ - api::core::v1::{Namespace, Pod}, - NamespaceResourceScope, -}; +use k8s_openapi::{api::core::v1::Namespace, NamespaceResourceScope}; use kube::{ api::ListParams, config::{KubeConfigOptions, Kubeconfig}, @@ -17,7 +14,7 @@ use mirrord_config::{ use mirrord_progress::Progress; use mirrord_protocol::MeshVendor; use serde::{Deserialize, Serialize}; -use tracing::{debug, info, trace}; +use tracing::{debug, info}; use crate::{ api::{ @@ -33,6 +30,8 @@ use crate::{ error::{KubeApiError, Result}, }; +#[cfg(not(feature = "incluster"))] +pub mod portforwarder; pub mod rollout; pub mod seeker; @@ -99,6 +98,7 @@ impl KubernetesAPI { ) -> Result { use std::{net::IpAddr, time::Duration}; + use k8s_openapi::api::core::v1::Pod; use tokio::net::TcpStream; let pod_api: Api = get_k8s_resource_api(&self.client, namespace.as_deref()); @@ -115,7 +115,7 @@ impl KubernetesAPI { let ip = pod_ip .parse::() .map_err(|e| KubeApiError::invalid_value(&pod, "status.podIp", e))?; - trace!("connecting to pod {pod_ip}:{agent_port}"); + tracing::trace!("connecting to pod {pod_ip}:{agent_port}"); tokio::time::timeout( Duration::from_secs(self.agent.startup_timeout), @@ -128,7 +128,7 @@ impl KubernetesAPI { Some(namespace) => format!("{pod_name}.{namespace}"), None => pod_name, }; - trace!("connecting to pod {hostname}:{agent_port}"); + tracing::trace!("connecting to pod {hostname}:{agent_port}"); tokio::time::timeout( Duration::from_secs(self.agent.startup_timeout), @@ -147,26 +147,10 @@ impl KubernetesAPI { &self, connect_info: AgentKubernetesConnectInfo, ) -> Result> { - use tokio_retry::{ - strategy::{jitter, ExponentialBackoff}, - Retry, - }; + let (stream, portforward) = + portforwarder::retry_portforward(&self.client, connect_info).await?; - let pod_api: Api = - get_k8s_resource_api(&self.client, connect_info.namespace.as_deref()); - let retry_strategy = ExponentialBackoff::from_millis(10).map(jitter).take(3); - let ports = &[connect_info.agent_port]; - let mut port_forwarder = Retry::spawn(retry_strategy, || { - trace!("port-forward to pod {:?}", &connect_info); - pod_api.portforward(&connect_info.pod_name, ports) - }) - .await?; - - let stream = port_forwarder - .take_stream(connect_info.agent_port) - .ok_or(KubeApiError::PortForwardFailed)?; - - let stream: Box = Box::new(stream); + tokio::spawn(portforward.into_retry_future()); Ok(stream) } diff --git a/mirrord/kube/src/api/kubernetes/portforwarder.rs b/mirrord/kube/src/api/kubernetes/portforwarder.rs new file mode 100644 index 00000000000..92cf9375b0c --- /dev/null +++ b/mirrord/kube/src/api/kubernetes/portforwarder.rs @@ -0,0 +1,220 @@ +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; + +use k8s_openapi::api::core::v1::Pod; +use kube::{Api, Client}; +use pin_project_lite::pin_project; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use tokio_retry::{ + strategy::{jitter, ExponentialBackoff}, + Retry, +}; + +use crate::{ + api::kubernetes::{get_k8s_resource_api, AgentKubernetesConnectInfo, UnpinStream}, + error::{KubeApiError, Result}, +}; + +pin_project! { + /// A wrapper for [`AsyncRead`] & [`AsyncWrite`] that dosn't call shutdown from [`AsyncWrite`] api + /// but it is done manually with `manual_shutdown` + struct ManualShutdown { + #[pin] + inner: S, + } +} + +impl ManualShutdown { + fn new(inner: S) -> Self { + ManualShutdown { inner } + } +} + +impl ManualShutdown +where + S: AsyncWrite + Unpin, +{ + async fn manual_shutdown(&mut self) -> Result<(), std::io::Error> { + self.inner.shutdown().await + } +} + +impl AsyncRead for ManualShutdown +where + S: AsyncRead, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + self.project().inner.poll_read(cx, buf) + } +} + +impl AsyncWrite for ManualShutdown +where + S: AsyncWrite, +{ + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.project().inner.poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_flush(cx) + } + + /// Does nothing, call `manual_shutdown` for actuall inner shutdown call + fn poll_shutdown( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } +} + +type RetryStrategy = dyn Iterator + Send; + +async fn create_portforward_streams( + pod_api: &Api, + connect_info: &AgentKubernetesConnectInfo, + retry_strategy: &mut RetryStrategy, +) -> Result<( + Box, + Box> + Unpin + Send>, +)> { + let ports = &[connect_info.agent_port]; + let mut port_forwarder = Retry::spawn(retry_strategy, || { + tracing::trace!("port-forward to pod {:?}", &connect_info); + pod_api.portforward(&connect_info.pod_name, ports) + }) + .await?; + + let stream = Box::new( + port_forwarder + .take_stream(connect_info.agent_port) + .ok_or(KubeApiError::PortForwardFailed)?, + ); + + let error_future = Box::new( + port_forwarder + .take_error(connect_info.agent_port) + .ok_or(KubeApiError::PortForwardFailed)?, + ); + + Ok((stream, error_future)) +} + +pub struct SinglePortForwarder { + connect_info: AgentKubernetesConnectInfo, + + retry_strategy: Box, + + pod_api: Api, + + sink: ManualShutdown>, + + stream: Box, + + error_future: Box> + Unpin + Send>, +} + +impl SinglePortForwarder { + pub async fn connect( + client: &Client, + connect_info: AgentKubernetesConnectInfo, + sink: Box, + ) -> Result { + let mut retry_strategy = Box::new(ExponentialBackoff::from_millis(10).map(jitter).take(5)); + + let pod_api: Api = get_k8s_resource_api(client, connect_info.namespace.as_deref()); + + let (stream, error_future) = + create_portforward_streams(&pod_api, &connect_info, &mut retry_strategy).await?; + + let sink = ManualShutdown::new(sink); + + Ok(SinglePortForwarder { + connect_info, + retry_strategy, + pod_api, + sink, + stream, + error_future, + }) + } + + pub async fn into_retry_future(self) { + let SinglePortForwarder { + mut error_future, + mut stream, + mut sink, + retry_strategy, + connect_info, + pod_api, + } = self; + + let mut retry_strategy = retry_strategy.peekable(); + + loop { + tokio::select! { + error = error_future.as_mut() => { + + if retry_strategy.peek().is_none() || error.is_none() { + tracing::warn!(?connect_info, "fininsed retry strategy closing connection"); + + break; + } + + if let Some(error) = error { + tracing::warn!(?connect_info, %error, "error while performing port-forward, retrying"); + } + + match create_portforward_streams(&pod_api, &connect_info, &mut retry_strategy).await { + Ok((next_stream, next_error_future)) => { + let _ = stream.shutdown().await; + + stream = next_stream; + error_future = next_error_future; + + tracing::trace!(?connect_info, "retry connect successful"); + } + Err(error) => { + tracing::error!(?connect_info, %error, "retry connect failed"); + + break; + } + } + } + copy_result = tokio::io::copy_bidirectional(&mut stream, &mut sink) => { + if let Err(error) = copy_result { + tracing::error!(?connect_info, %error, "unable to copy_bidirectional agent stream to local sink"); + + break; + } + } + } + } + + let _ = sink.manual_shutdown().await; + } +} + +pub async fn retry_portforward( + client: &Client, + connect_info: AgentKubernetesConnectInfo, +) -> Result<(Box, SinglePortForwarder)> { + let (lhs, rhs) = tokio::io::duplex(4096); + + let port_forwarder = SinglePortForwarder::connect(client, connect_info, Box::new(rhs)).await?; + + Ok((Box::new(lhs), port_forwarder)) +}