Skip to content

Commit

Permalink
Merge pull request #43 from mpihlak/ensure-processing-order
Browse files Browse the repository at this point in the history
Restore ordering by having tracker alternate between client and server messages
  • Loading branch information
mpihlak authored Sep 9, 2024
2 parents e52162f + a6f24ba commit c2411eb
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 72 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 17 additions & 4 deletions mongo-protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,10 @@ impl MongoMessageProxy {
let remaining_bytes = (HEADER_LENGTH - self.buf.len()) as u64;
let len = reader.take(remaining_bytes).read_buf(&mut self.buf)
.await
.map_err(ProxyError::IoError)?;
.map_err(|e| {
warn!("error reading message header: {e}");
ProxyError::IoError(e)
})?;
debug!("Read {len} of the required header bytes.");

if len == 0 {
Expand All @@ -195,9 +198,13 @@ impl MongoMessageProxy {

OPCODE_COUNTER.with_label_values(&[&hdr.op_code.to_string()]).inc();

debug!("Proxying header bytes");
writer.write_all(&self.buf)
.await
.map_err(ProxyError::IoError)?;
.map_err(|e| {
warn!("error proxying message bytes: {e}");
ProxyError::IoError(e)
})?;

// Collect the message length worth of bytes. Don't take any more than
// is needed so that we can safely clear the buffer here without losing
Expand All @@ -208,7 +215,10 @@ impl MongoMessageProxy {
let remaining_bytes = (message_length - self.buf.len()) as u64;
let len = reader.take(remaining_bytes).read_buf(&mut self.buf)
.await
.map_err(ProxyError::IoError)?;
.map_err(|e| {
warn!("error reading message body: {e}");
ProxyError::IoError(e)
})?;
debug!("Read {len} of the required {message_length} bytes.");

if len == 0 {
Expand All @@ -219,7 +229,10 @@ impl MongoMessageProxy {

writer.write_all(&self.buf)
.await
.map_err(ProxyError::IoError)?;
.map_err(|e| {
warn!("error proxying message bytes: {e}");
ProxyError::IoError(e)
})?;

READ_BUFFER_SIZE.with_label_values(&[read_source]).set(self.buf.capacity() as f64);
READ_BUFFER_CAPACITY.with_label_values(&[read_source]).set(self.buf.capacity() as f64);
Expand Down
3 changes: 2 additions & 1 deletion proxy/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mongoproxy"
version = "0.5.32"
version = "0.5.34"
authors = ["mpihlak <martin.pihlak@starship.co>"]
edition = "2018"

Expand All @@ -26,3 +26,4 @@ opentelemetry-jaeger = { version="0.11", features = ["tokio"] }
mongo-protocol = { path="../mongo-protocol" }
async-bson = { path="../async-bson" }
regex = { version = "1.8.1", features = ["unicode-case"] }
tracing-test = "0.2.5"
43 changes: 27 additions & 16 deletions proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use mongoproxy::dstaddr;
use mongoproxy::appconfig::AppConfig;
use mongoproxy::tracker::{MongoStatsTracker, TrackerMessage};

use mongo_protocol::{MongoMessage, MongoMessageProxy, MsgHeader, ProxyError};
use mongo_protocol::{MongoMessageProxy, ProxyError};


const JAEGER_ADDR: &str = "127.0.0.1:6831";
Expand All @@ -36,12 +36,10 @@ const SERVICE_NAME: &str = "mongoproxy";
// Max number of bytes to read from the network
const READ_BUFFER_SIZE: usize = 16384;

// The largest message we can expect from MongoDb (oversize to be safe)
const MAX_MONGO_MESSAGE_SIZE: usize = 64*1024*1024;

// Max number of events the client and server message channels can take.
// We ought to be able to buffer the maximum MongoDb message there.
const MAX_CHANNEL_EVENTS: usize = MAX_MONGO_MESSAGE_SIZE / READ_BUFFER_SIZE;
// Max number of pending events the client and server message channels can take.
// Overly large values are not needed, just some amount of buffering to allow
// some out of order messages to be processed.
const MAX_CHANNEL_EVENTS: usize = 5;

lazy_static! {
static ref MONGOPROXY_RUNTIME_INFO: CounterVec =
Expand Down Expand Up @@ -73,6 +71,12 @@ lazy_static! {
"mongoproxy_server_connect_time_seconds",
"Time it takes to look up and connect to a server",
&["server_addr"]).unwrap();

static ref TRACKER_CHANNEL_ERRORS_TOTAL: CounterVec =
register_counter_vec!(
"mongoproxy_tracker_channel_errors_total",
"Total number of errors from sending bytes to tracker channel",
&["tracker"]).unwrap();
}

#[tokio::main]
Expand Down Expand Up @@ -263,8 +267,8 @@ async fn handle_connection(server_addr: &str, client_stream: TcpStream, app: App
// Start the tracker to parse and track MongoDb messages from the input stream. This works by
// having the proxy tasks send a copy of the parsed Mongo message over a channel.

let (client_tracker, tracker_rx): (mpsc::Sender<TrackerMessage>, mpsc::Receiver<TrackerMessage>) = mpsc::channel(MAX_CHANNEL_EVENTS);
let server_tracker = client_tracker.clone();
let (client_tx, client_rx): (mpsc::Sender<TrackerMessage>, mpsc::Receiver<TrackerMessage>) = mpsc::channel(MAX_CHANNEL_EVENTS);
let (server_tx, server_rx): (mpsc::Sender<TrackerMessage>, mpsc::Receiver<TrackerMessage>) = mpsc::channel(MAX_CHANNEL_EVENTS);

let mut task_set = JoinSet::new();

Expand All @@ -273,7 +277,8 @@ async fn handle_connection(server_addr: &str, client_stream: TcpStream, app: App
&server_addr.to_string(),
server_addr,
app,
tracker_rx,
client_rx,
server_rx,
);

task_set.spawn(async move {
Expand All @@ -287,7 +292,7 @@ async fn handle_connection(server_addr: &str, client_stream: TcpStream, app: App
// Read messages from client and pass to server and client tracker.
let client_proxy = MongoMessageProxy::new(READ_BUFFER_SIZE, log_mongo_messages, tracing_enabled);
task_set.spawn(async move {
proxy_loop(true, client_proxy, read_client, write_server, client_tracker).await;
proxy_loop(true, client_proxy, read_client, write_server, client_tx).await;
debug!("Client proxy done");
}.instrument(info_span!("client proxy")));

Expand All @@ -296,7 +301,7 @@ async fn handle_connection(server_addr: &str, client_stream: TcpStream, app: App
// a separate task.
let server_proxy = MongoMessageProxy::new(READ_BUFFER_SIZE, log_mongo_messages, tracing_enabled);
task_set.spawn(async move {
proxy_loop(false, server_proxy, read_server, write_client, server_tracker).await;
proxy_loop(false, server_proxy, read_server, write_client, server_tx).await;
debug!("Server proxy done");
}.instrument(info_span!("server proxy")));

Expand All @@ -318,16 +323,22 @@ async fn proxy_loop(
mut proxy: MongoMessageProxy,
mut read_from: OwnedReadHalf,
mut write_to: OwnedWriteHalf,
tracker: Sender<(bool, MsgHeader, MongoMessage)>,
tracker: Sender<TrackerMessage>,
)
{
let read_source = if is_client { "client" } else { "server" };
let mut tracker_ok = true;

loop {
match proxy.proxy_mongo_message(read_source, &mut read_from, &mut write_to).await {
Ok((hdr, msg)) => {
if let Err(e) = tracker.send((is_client, hdr, msg)).await {
warn!("error sending message to server tracker: {e}");
break;
if !tracker_ok {
continue;
}
if let Err(e) = tracker.try_send((hdr, msg)) {
warn!("error sending message to server tracker, stopping: {e}");
tracker_ok = false;
TRACKER_CHANNEL_ERRORS_TOTAL.with_label_values(&[read_source]).inc();
}
},
Err(ProxyError::EOF) => break,
Expand Down
Loading

0 comments on commit c2411eb

Please sign in to comment.