Skip to content

Commit

Permalink
Support exit command
Browse files Browse the repository at this point in the history
This allows us to ensure that the transport exists after it has drained
all messages in the queue.
  • Loading branch information
benjaminwood committed Sep 19, 2023
1 parent 5c07e13 commit 437d765
Showing 1 changed file with 17 additions and 4 deletions.
21 changes: 17 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use url::Url;
use std::sync::Arc;
use tokio::sync::Mutex;
use std::process::exit;
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
Expand All @@ -22,7 +23,8 @@ async fn main() {
let output_pipe_path = format!("/tmp/{}_2", runner_id);

let (pipe_tx, pipe_rx) = futures_channel::mpsc::unbounded();
tokio::spawn(read_pipe(input_pipe_path.clone(), pipe_tx));
let (exit_tx, mut exit_rx) = mpsc::channel(1);
tokio::spawn(read_pipe(input_pipe_path.clone(), pipe_tx, exit_tx));

let output_fifo = Arc::new(Mutex::new(OpenOptions::new().write(true).open(output_pipe_path).await.unwrap()));
let output_fifo_clone = Arc::clone(&output_fifo);
Expand All @@ -44,8 +46,14 @@ async fn main() {
})
};

pin_mut!(pipe_to_ws, ws_to_pipe);
future::select(pipe_to_ws, ws_to_pipe).await;
let exit_fut = async {
exit_rx.recv().await;
eprintln!("Received 'exit' command. Exiting...");
exit(0);
};

pin_mut!(pipe_to_ws, ws_to_pipe, exit_fut);
future::select(future::select(pipe_to_ws, ws_to_pipe), exit_fut).await;
},
Err(_) => {
eprintln!("Failed to connect to the WebSocket server. Exiting...");
Expand All @@ -56,7 +64,7 @@ async fn main() {

// Our helper method which will read data from the input pipe and send it along the
// sender provided.
async fn read_pipe(path: String, tx: futures_channel::mpsc::UnboundedSender<Message>) {
async fn read_pipe(path: String, tx: futures_channel::mpsc::UnboundedSender<Message>, exit_tx: mpsc::Sender<()>) {
let mut pipe = OpenOptions::new().read(true).open(path).await.unwrap();
let mut buf = Vec::new();
loop {
Expand All @@ -70,6 +78,11 @@ async fn read_pipe(path: String, tx: futures_channel::mpsc::UnboundedSender<Mess
// Only check for '\n' after reading all chunks
if let Some(i) = buf.iter().rposition(|&b| b == b'\n') {
let line = buf.drain(..=i).collect::<Vec<_>>();
let line_str = String::from_utf8(line.clone()).unwrap();
if line_str.trim() == "exit" {
exit_tx.send(()).await.expect("Failed to send exit signal");
return;
}
tx.unbounded_send(Message::text(String::from_utf8(line).unwrap())).unwrap();
}
}
Expand Down

0 comments on commit 437d765

Please sign in to comment.