Skip to content

Commit

Permalink
Merge pull request #6 from selectiveci/handle-close-message
Browse files Browse the repository at this point in the history
Handle close message by exiting the transport
  • Loading branch information
benjaminwood authored Feb 28, 2024
2 parents f6954fa + f0548d9 commit 9eb6689
Showing 1 changed file with 42 additions and 14 deletions.
56 changes: 42 additions & 14 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ async fn main() {

let (pipe_tx, pipe_rx) = futures_channel::mpsc::unbounded();
let (exit_tx, mut exit_rx) = mpsc::channel(1);
tokio::spawn(read_pipe(input_pipe_path.clone(), pipe_tx, exit_tx));
let exit_tx_clone = exit_tx.clone(); // Clone exit_tx for use in the read_pipe function.
tokio::spawn(read_pipe(input_pipe_path.clone(), pipe_tx, exit_tx_clone));

let output_fifo = Arc::new(Mutex::new(
OpenOptions::new()
Expand All @@ -41,21 +42,48 @@ async fn main() {
let (write, read) = ws_stream.split();
let pipe_to_ws = pipe_rx.map(Ok).forward(write);
let ws_to_pipe = {
read.for_each(move |message| {
read.for_each(move |message_result| {
let output_fifo = Arc::clone(&output_fifo_clone);
let exit_tx_clone = exit_tx.clone();
async move {
let mut data = message.unwrap().into_data();
data.push(b'\n');
let mut output_fifo = output_fifo.lock().await;
output_fifo
.write_all(&data)
.await
.expect("Failed to write to output pipe");
output_fifo
.flush()
.await
.expect("Failed to flush output pipe");
drop(output_fifo);
if cfg!(debug_assertions) {
match &message_result {
Ok(Message::Text(_)) => println!("Received Text message"),
Ok(Message::Binary(_)) => println!("Received Binary message"),
Ok(Message::Ping(_)) => println!("Received Ping message"),
Ok(Message::Pong(_)) => println!("Received Pong message"),
Ok(Message::Close(_)) => println!("Received Close message, exiting..."),
Err(e) => eprintln!("Error receiving message: {}", e),
_ => println!("Received an unexpected type of message"),
}
}

match &message_result {
Ok(Message::Close(_)) => {
let _ = exit_tx_clone
.send(())
.await
.expect("Failed to send exit signal");
return;
}
_ => {}
}

if let Ok(message) = message_result {
let data = message.into_data();
let mut data_with_newline = data.clone();
data_with_newline.push(b'\n');
let mut output_fifo = output_fifo.lock().await;
output_fifo
.write_all(&data_with_newline)
.await
.expect("Failed to write to output pipe");
output_fifo
.flush()
.await
.expect("Failed to flush output pipe");
drop(output_fifo);
}
}
})
};
Expand Down

0 comments on commit 9eb6689

Please sign in to comment.