Skip to content

Commit

Permalink
Merge pull request #5 from selectiveci/bruce/tweaks
Browse files Browse the repository at this point in the history
Clippy and formatting fixes
  • Loading branch information
benjaminwood authored Nov 19, 2023
2 parents 76913a5 + 4b6bae8 commit f6954fa
Showing 1 changed file with 53 additions and 39 deletions.
92 changes: 53 additions & 39 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#![warn(clippy::perf, clippy::style, clippy::pedantic)]

use futures_util::{future, pin_mut, StreamExt};
use std::process::exit;
use std::sync::Arc;
use tokio::fs::OpenOptions;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::mpsc;
use tokio::sync::Mutex;
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
use tokio::fs::OpenOptions;
use url::Url;
use std::sync::Arc;
use tokio::sync::Mutex;
use std::process::exit;
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
Expand All @@ -19,51 +21,63 @@ async fn main() {
let url = Url::parse(&args[1]).unwrap();
let runner_id: String = args[2].clone();

let input_pipe_path = format!("/tmp/{}_1", runner_id);
let output_pipe_path = format!("/tmp/{}_2", runner_id);
let input_pipe_path = format!("/tmp/{runner_id}_1");
let output_pipe_path = format!("/tmp/{runner_id}_2");

let (pipe_tx, pipe_rx) = futures_channel::mpsc::unbounded();
let (exit_tx, mut exit_rx) = mpsc::channel(1);
tokio::spawn(read_pipe(input_pipe_path.clone(), pipe_tx, exit_tx));

let output_fifo = Arc::new(Mutex::new(OpenOptions::new().write(true).open(output_pipe_path).await.unwrap()));
let output_fifo = Arc::new(Mutex::new(
OpenOptions::new()
.write(true)
.open(output_pipe_path)
.await
.unwrap(),
));
let output_fifo_clone = Arc::clone(&output_fifo);

match connect_async(url).await {
Ok((ws_stream, _)) => {
let (write, read) = ws_stream.split();
let pipe_to_ws = pipe_rx.map(Ok).forward(write);
let ws_to_pipe = {
read.for_each(move |message| {
let output_fifo = Arc::clone(&output_fifo_clone);
async move {
let mut data = message.unwrap().into_data();
data.push('\n' as u8);
let mut output_fifo= output_fifo.lock().await;
output_fifo.write_all(&data).await.expect("Failed to write to output pipe");
output_fifo.flush().await.expect("Failed to flush output pipe");
}
})
};
if let Ok((ws_stream, _)) = connect_async(url).await {
let (write, read) = ws_stream.split();
let pipe_to_ws = pipe_rx.map(Ok).forward(write);
let ws_to_pipe = {
read.for_each(move |message| {
let output_fifo = Arc::clone(&output_fifo_clone);
async move {
let mut data = message.unwrap().into_data();
data.push(b'\n');
let mut output_fifo = output_fifo.lock().await;
output_fifo
.write_all(&data)
.await
.expect("Failed to write to output pipe");
output_fifo
.flush()
.await
.expect("Failed to flush output pipe");
drop(output_fifo);
}
})
};

let exit_fut = async {
exit_rx.recv().await;
exit(0);
};

let exit_fut = async {
exit_rx.recv().await;
exit(0);
};

pin_mut!(pipe_to_ws, ws_to_pipe, exit_fut);
future::select(future::select(pipe_to_ws, ws_to_pipe), exit_fut).await;
},
Err(_) => {
eprintln!("Failed to connect to the WebSocket server. Exiting...");
exit(1);
}
pin_mut!(pipe_to_ws, ws_to_pipe, exit_fut);
future::select(future::select(pipe_to_ws, ws_to_pipe), exit_fut).await;
} else {
eprintln!("Failed to connect to the WebSocket server. Exiting...");
exit(1);
}
}

// Our helper method which will read data from the input pipe and send it along the
// sender provided.
async fn read_pipe(path: String, tx: futures_channel::mpsc::UnboundedSender<Message>, exit_tx: mpsc::Sender<()>) {
async fn read_pipe(
path: String,
tx: futures_channel::mpsc::UnboundedSender<Message>,
exit_tx: mpsc::Sender<()>,
) {
let mut pipe = OpenOptions::new().read(true).open(path).await.unwrap();
let mut buf = Vec::new();
loop {
Expand Down

0 comments on commit f6954fa

Please sign in to comment.