Skip to content

Commit

Permalink
Removing usage of Event type in wui
Browse files Browse the repository at this point in the history
  • Loading branch information
NicholasLYang committed Jul 31, 2024
1 parent 9d21857 commit 8354fbe
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 29 deletions.
12 changes: 9 additions & 3 deletions crates/turborepo-lib/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use turborepo_env::EnvironmentVariableMap;
use turborepo_repository::package_graph::{PackageGraph, PackageName, PackageNode};
use turborepo_scm::SCM;
use turborepo_telemetry::events::generic::GenericEventBuilder;
use turborepo_ui::{cprint, cprintln, tui, tui::AppSender, BOLD_GREY, GREY, UI};
use turborepo_ui::{
cprint, cprintln, tui, tui::AppSender, wui, wui::WebUISender, BOLD_GREY, GREY, UI,
};

pub use crate::run::error::Error;
use crate::{
Expand Down Expand Up @@ -182,8 +184,12 @@ impl Run {
&& tui::terminal_big_enough()?)
}

pub fn start_web_ui(&self) -> Result<Option<JoinHandle<Result<(), tui::Error>>>, Error> {
turborepo_ui::wui::start_ws_server(self.ui.rx.clone())
pub fn start_web_ui(
&self,
) -> Result<Option<(WebUISender, JoinHandle<Result<(), wui::Error>>)>, Error> {
let (tx, rx) = tokio::sync::broadcast::channel(100);
let handle = tokio::spawn(turborepo_ui::wui::start_ws_server(rx));
Ok(Some((WebUISender { tx }, handle)))
}

#[allow(clippy::type_complexity)]
Expand Down
58 changes: 32 additions & 26 deletions crates/turborepo-ui/src/wui/mod.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
//! Web UI for Turborepo. Creates a WebSocket server that can be subscribed to
//! by a web client to display the status of tasks.

use std::net::SocketAddr;

use axum::{
extract::{
ws::{Message, WebSocket},
State, WebSocketUpgrade,
},
http::Response,
response::IntoResponse,
routing::get,
Router,
};
use serde::Serialize;
use thiserror::Error;
use tokio::net::TcpListener;

use crate::tui::event::{CacheResult, Event, OutputLogs, TaskResult};

Expand All @@ -27,10 +23,14 @@ pub enum Error {
WebSocket(#[source] axum::Error),
}

pub struct WebUISender {
pub tx: tokio::sync::broadcast::Sender<WebUIEvent>,
}

// Specific events that the websocket server can send to the client,
// not all the `Event` types from the TUI
#[derive(Debug, Clone, Serialize)]
enum WsEvent {
enum WebUIEvent {
StartTask {
task: String,
output_logs: OutputLogs,
Expand All @@ -53,59 +53,65 @@ enum WsEvent {
},
}

impl WsEvent {
impl WebUIEvent {
fn from_event(event: Event) -> Option<Self> {
match event {
Event::StartTask { task, output_logs } => {
Some(WsEvent::StartTask { task, output_logs })
Some(WebUIEvent::StartTask { task, output_logs })
}
Event::TaskOutput { task, output } => Some(WsEvent::TaskOutput { task, output }),
Event::EndTask { task, result } => Some(WsEvent::EndTask { task, result }),
Event::TaskOutput { task, output } => Some(WebUIEvent::TaskOutput { task, output }),
Event::EndTask { task, result } => Some(WebUIEvent::EndTask { task, result }),
Event::Status {
task,
status,
result,
} => Some(WsEvent::Status {
} => Some(WebUIEvent::Status {
task,
status,
result,
}),
Event::UpdateTasks { tasks } => Some(WsEvent::UpdateTasks { tasks }),
Event::UpdateTasks { tasks } => Some(WebUIEvent::UpdateTasks { tasks }),
_ => None,
}
}
}

#[derive(Clone)]
struct AppState {
rx: tokio::sync::mpsc::Receiver<Event>,
rx: tokio::sync::broadcast::Receiver<WebUIEvent>,
}

impl Clone for AppState {
fn clone(&self) -> Self {
Self {
rx: self.rx.resubscribe(),
}
}
}

async fn handler(ws: WebSocketUpgrade, State(state): State<AppState>) -> impl IntoResponse {
ws.on_upgrade(handle_socket)
ws.on_upgrade(|socket| handle_socket(socket, state))
}

async fn handle_socket(mut socket: WebSocket, state: AppState) {
let mut state = state.clone();
while let Some(event) = state.rx.recv().await {
if let Some(ws_event) = WsEvent::from_event(event) {
let message_payload = serde_json::to_string(&ws_event).unwrap();
if socket.send(Message::Text(message_payload)).await.is_err() {
// client disconnected
return;
}
while let Ok(event) = state.rx.recv().await {
let message_payload = serde_json::to_string(&event).unwrap();
if socket.send(Message::Text(message_payload)).await.is_err() {
// client disconnected
return;
}
}
}

pub async fn start_ws_server(rx: tokio::sync::mpsc::Receiver<Event>) -> Result<(), Error> {
pub async fn start_ws_server(
rx: tokio::sync::broadcast::Receiver<WebUIEvent>,
) -> Result<(), Error> {
let app = Router::new()
.route("/ws", get(handler))
.with_state(AppState { rx });

let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
axum::serve(listener, app).await?;

Ok(axum_server::bind(addr)
.serve(app.into_make_service())
.await?)
Ok(())
}

0 comments on commit 8354fbe

Please sign in to comment.