Skip to content

Commit

Permalink
fixup! Add proxy_loop fn
Browse files Browse the repository at this point in the history
  • Loading branch information
mpihlak committed Sep 2, 2024
1 parent 3448efc commit f9b0a1b
Showing 1 changed file with 6 additions and 5 deletions.
11 changes: 6 additions & 5 deletions proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ async fn handle_connection(server_addr: &str, client_stream: TcpStream, app: App
// Read messages from client and pass to server and client tracker.
let client_proxy = MongoMessageProxy::new(READ_BUFFER_SIZE, log_mongo_messages, tracing_enabled);
task_set.spawn(async move {
proxy_loop("client", client_proxy, read_client, write_server, client_tracker).await;
proxy_loop(true, client_proxy, read_client, write_server, client_tracker).await;
debug!("Client proxy done");
}.instrument(info_span!("client proxy")));

Expand All @@ -296,7 +296,7 @@ async fn handle_connection(server_addr: &str, client_stream: TcpStream, app: App
// a separate task.
let server_proxy = MongoMessageProxy::new(READ_BUFFER_SIZE, log_mongo_messages, tracing_enabled);
task_set.spawn(async move {
proxy_loop("server", server_proxy, read_server, write_client, server_tracker).await;
proxy_loop(false, server_proxy, read_server, write_client, server_tracker).await;
debug!("Server proxy done");
}.instrument(info_span!("server proxy")));

Expand All @@ -314,17 +314,18 @@ async fn handle_connection(server_addr: &str, client_stream: TcpStream, app: App
}

async fn proxy_loop(
name: &str,
is_client: bool,
mut proxy: MongoMessageProxy,
mut read_from: OwnedReadHalf,
mut write_to: OwnedWriteHalf,
tracker: Sender<(bool, MsgHeader, MongoMessage)>,
)
{
let read_source = if is_client { "client" } else { "server" };
loop {
match proxy.proxy_mongo_message(name, &mut read_from, &mut write_to).await {
match proxy.proxy_mongo_message(read_source, &mut read_from, &mut write_to).await {
Ok((hdr, msg)) => {
if let Err(e) = tracker.send((false, hdr, msg)).await {
if let Err(e) = tracker.send((is_client, hdr, msg)).await {
warn!("error sending message to server tracker: {e}");
break;
}
Expand Down

0 comments on commit f9b0a1b

Please sign in to comment.