Skip to content

Commit

Permalink
docs + clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
meowjesty committed Oct 10, 2024
1 parent 3ec68d7 commit dd2c2d3
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 34 deletions.
64 changes: 34 additions & 30 deletions mirrord/intproxy/src/proxies/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

use std::{
collections::{hash_map::Entry, HashMap},
error::Error,
fmt, io,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
};
Expand Down Expand Up @@ -576,6 +575,7 @@ impl IncomingProxy {
/// If we cannot get the next frame of the streamed body, then we retry the whole
/// process, by sending the original `request` again through the http `interceptor` to
/// our hyper handler.
#[allow(clippy::type_complexity)]
#[tracing::instrument(level = Level::DEBUG, skip(self), ret)]
async fn streamed_http_response(
&mut self,
Expand All @@ -585,7 +585,13 @@ impl IncomingProxy {
let mut body = vec![];
let key = (response.connection_id, response.request_id);

match response.internal_response.body.next_frames(false).await {
match response
.internal_response
.body
.next_frames(false)
.await
.map_err(InterceptorError::from)
{
Ok(frames) => {
frames
.frames
Expand All @@ -612,38 +618,36 @@ impl IncomingProxy {
response,
)))
}
Err(fail) => {
tracing::warn!(?fail, "Error while receiving streamed response frames");
// Retry on `RST_STREAM` error.
Err(InterceptorError::Reset) => {
tracing::warn!("`RST_STREAM` received in the response, retrying!");

let interceptor = self
.interceptors
.get(&InterceptorId(response.connection_id))?;

// Retry on `RST_STREAM` error.
if fail
.source()
.and_then(|source| source.downcast_ref::<h2::Error>())
.is_some_and(|h2_fail| h2_fail.is_reset())
if let Some(HttpRequestFallback::Streamed { request, retries }) = request
&& retries < RETRY_ON_RESET_ATTEMPTS
{
let interceptor = self
.interceptors
.get(&InterceptorId(response.connection_id))?;

if let Some(HttpRequestFallback::Streamed { request, retries }) = request
&& retries < RETRY_ON_RESET_ATTEMPTS
{
tracing::trace!(
?request,
?retries,
"`RST_STREAM` from hyper, retrying the request."
);
interceptor
.tx
.send(HttpRequestFallback::Streamed {
request,
retries: retries + 1,
})
.await;
}
tracing::trace!(
?request,
?retries,
"`RST_STREAM` from hyper, retrying the request."
);
interceptor
.tx
.send(HttpRequestFallback::Streamed {
request,
retries: retries + 1,
})
.await;
}

return None;
None
}
Err(fail) => {
tracing::warn!(?fail, "Something went wrong, skipping this response!");
None
}
}
}
Expand Down
22 changes: 18 additions & 4 deletions mirrord/intproxy/src/proxies/incoming/interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,17 @@ pub enum InterceptorError {
#[error("received an HTTP request, but expected raw bytes")]
UnexpectedHttpRequest,

/// We dig into the [`hyper::Error`] to try and see if it's an [`h2::Error`], checking
/// for [`h2::Error::is_reset`].
///
/// [`hyper::Error`] mentions that `source` is not a guaranteed thing we can check for,
/// so if you see any weird behavior, check that the [`h2`] crate is in sync with
/// whatever hyper changed (for errors).
#[error("HTTP2 `RST_STREAM` received, we should retry the hyper connection!")]
Reset,

/// We have reached the max number of attempts that we can retry our http connection,
/// due to a `RST_STREAM`, or when the connection has been closed too soon.
#[error("HTTP2 reached the maximun amount of retries!")]
MaxRetries,
}
Expand Down Expand Up @@ -351,8 +359,10 @@ impl HttpConnection {
}

/// Sends the given [`HttpRequestFallback`] to the server.
/// If the HTTP connection with server is closed too soon, starts a new connection
/// and retries using a [`Backoff`].
///
/// If we get a `RST_STREAM` error from the server, or the connection was closed too
/// soon starts a new connection and retries using a [`Backoff`] until we reach
/// [`RETRY_ON_RESET_ATTEMPTS`].
///
/// Returns [`HttpResponseFallback`] from the server.
#[tracing::instrument(level = Level::DEBUG, skip(self), ret, err)]
Expand All @@ -369,8 +379,12 @@ impl HttpConnection {

match self.handle_response(request.clone(), response).await {
Ok(response) => return Ok(response),
Err(InterceptorError::Reset) => {
tracing::warn!(?request, "Request connection was reset, retrying.");
Err(InterceptorError::Reset)
| Err(InterceptorError::ConnectionClosedTooSoon(_)) => {
tracing::warn!(
?request,
"`RST_STREAM` request connection was reset, retrying."
);
match duration {
Some(duration) => {
sleep(duration).await;
Expand Down

0 comments on commit dd2c2d3

Please sign in to comment.