diff --git a/README.md b/README.md index cee37ef..1558357 100644 --- a/README.md +++ b/README.md @@ -65,6 +65,7 @@ OPTIONS: -V, --version Print version information SUBCOMMANDS: + build Build all services or a specific one config Get the config of a service down Stop all services help Print this message or the help of the given subcommand(s) diff --git a/examples/nginx-nodejs-redis/Superfile.hcl b/examples/nginx-nodejs-redis/Superfile.hcl index 2925a20..f0a194f 100644 --- a/examples/nginx-nodejs-redis/Superfile.hcl +++ b/examples/nginx-nodejs-redis/Superfile.hcl @@ -20,6 +20,9 @@ services = [ flox = { "environment" = ".#nginx-nodejs-redis" } + build = { + "command" = "npm install" + } }, { "name" = "redis" diff --git a/proto/superviseur/v1alpha1/control.proto b/proto/superviseur/v1alpha1/control.proto index 1cb87e3..83b9be4 100644 --- a/proto/superviseur/v1alpha1/control.proto +++ b/proto/superviseur/v1alpha1/control.proto @@ -66,6 +66,15 @@ message ListResponse { repeated objects.v1alpha1.Service services = 1; } +message BuildRequest { + string name = 1; + string config_file_path = 2; +} + +message BuildResponse { + bool success = 1; +} + service ControlService { rpc LoadConfig (LoadConfigRequest) returns (LoadConfigResponse) {} rpc Start (StartRequest) returns (StartResponse) {} @@ -73,5 +82,6 @@ service ControlService { rpc Restart (RestartRequest) returns (RestartResponse) {} rpc Status (StatusRequest) returns (StatusResponse) {} rpc List (ListRequest) returns (ListResponse) {} + rpc Build (BuildRequest) returns (BuildResponse) {} rpc ListRunningProcesses (ListRunningProcessesRequest) returns (ListRunningProcessesResponse) {} } \ No newline at end of file diff --git a/src/api/superviseur.v1alpha1.rs b/src/api/superviseur.v1alpha1.rs index a08b688..5661d38 100644 --- a/src/api/superviseur.v1alpha1.rs +++ b/src/api/superviseur.v1alpha1.rs @@ -92,6 +92,20 @@ pub struct ListResponse { #[prost(message, repeated, tag = "1")] pub services: ::prost::alloc::vec::Vec, } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BuildRequest { + #[prost(string, tag = "1")] + pub name: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub config_file_path: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BuildResponse { + #[prost(bool, tag = "1")] + pub success: bool, +} /// Generated client implementations. pub mod control_service_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] @@ -275,6 +289,25 @@ pub mod control_service_client { ); self.inner.unary(request.into_request(), path, codec).await } + pub async fn build( + &mut self, + request: impl tonic::IntoRequest, + ) -> Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/superviseur.v1alpha1.ControlService/Build", + ); + self.inner.unary(request.into_request(), path, codec).await + } pub async fn list_running_processes( &mut self, request: impl tonic::IntoRequest, @@ -330,6 +363,10 @@ pub mod control_service_server { &self, request: tonic::Request, ) -> Result, tonic::Status>; + async fn build( + &self, + request: tonic::Request, + ) -> Result, tonic::Status>; async fn list_running_processes( &self, request: tonic::Request, @@ -619,6 +656,43 @@ pub mod control_service_server { }; Box::pin(fut) } + "/superviseur.v1alpha1.ControlService/Build" => { + #[allow(non_camel_case_types)] + struct BuildSvc(pub Arc); + impl< + T: ControlService, + > tonic::server::UnaryService for BuildSvc { + type Response = super::BuildResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = self.0.clone(); + let fut = async move { (*inner).build(request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = BuildSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/superviseur.v1alpha1.ControlService/ListRunningProcesses" => { #[allow(non_camel_case_types)] struct ListRunningProcessesSvc(pub Arc); diff --git a/src/cmd/build.rs b/src/cmd/build.rs new file mode 100644 index 0000000..82f4acc --- /dev/null +++ b/src/cmd/build.rs @@ -0,0 +1,39 @@ +use anyhow::Error; +use tokio::net::UnixStream; +use tonic::transport::{ Endpoint, Uri}; +use tower::service_fn; + +use crate::{config::verify_if_config_file_is_present, types::{SUPERFILE, UNIX_SOCKET_PATH}, api::superviseur::v1alpha1::{BuildRequest, control_service_client::ControlServiceClient, LoadConfigRequest}}; + +pub async fn execute_build(name: Option<&str>) -> Result<(), Error> { + verify_if_config_file_is_present()?; + let current_dir = std::env::current_dir()?; + let config = std::fs::read_to_string(current_dir.join(SUPERFILE))?; + + let channel = Endpoint::try_from("http://[::]:50051")? + .connect_with_connector(service_fn(move |_: Uri| UnixStream::connect(UNIX_SOCKET_PATH))) + .await + .map_err(|_| + Error::msg(format!("Cannot connect to the Superviseur daemon at unix:{}. Is the superviseur daemon running?", UNIX_SOCKET_PATH)))?; + + // let mut client = ControlServiceClient::connect("http://127.0.0.1:5476").await?; + let mut client = ControlServiceClient::new(channel); + + let request = tonic::Request::new(LoadConfigRequest { + config, + file_path: current_dir.to_str().unwrap().to_string(), + }); + + client.load_config(request).await?; + + let name = name.unwrap_or_default().to_string(); + + let request = tonic::Request::new(BuildRequest { + name, + config_file_path: current_dir.to_str().unwrap().to_string(), + }); + + client.build(request).await?; + + Ok(()) +} diff --git a/src/cmd/mod.rs b/src/cmd/mod.rs index 81dd88a..4728550 100644 --- a/src/cmd/mod.rs +++ b/src/cmd/mod.rs @@ -1,3 +1,4 @@ +pub mod build; pub mod config; pub mod init; pub mod list; diff --git a/src/cmd/new.rs b/src/cmd/new.rs index 4d90966..38b4f97 100644 --- a/src/cmd/new.rs +++ b/src/cmd/new.rs @@ -30,6 +30,7 @@ pub fn execute_new(cfg_format: ConfigFormat) { stderr: "/tmp/demo-stderr.log".to_string(), wait_for: None, flox: None, + build: None, }], }; let serialized = match cfg_format { diff --git a/src/cmd/ui.rs b/src/cmd/ui.rs index b7661d5..6d74414 100644 --- a/src/cmd/ui.rs +++ b/src/cmd/ui.rs @@ -52,5 +52,4 @@ pub async fn execute_ui() -> Result<(), Error> { tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; } - Ok(()) } diff --git a/src/graphql/schema/objects/subscriptions.rs b/src/graphql/schema/objects/subscriptions.rs index 9db40ba..8935dc4 100644 --- a/src/graphql/schema/objects/subscriptions.rs +++ b/src/graphql/schema/objects/subscriptions.rs @@ -148,3 +148,41 @@ impl AllServicesRestarted { &self.payload } } + +#[derive(Default, Clone)] +pub struct ServiceBuilding { + pub payload: Service, + pub process: Process, +} + +#[Object] +impl ServiceBuilding { + async fn payload(&self) -> &Service { + &self.payload + } +} + +#[derive(Default, Clone)] +pub struct ServiceBuilt { + pub payload: Service, + pub process: Process, +} + +#[Object] +impl ServiceBuilt { + async fn payload(&self) -> &Service { + &self.payload + } +} + +#[derive(Default, Clone)] +pub struct AllServicesBuilt { + pub payload: Vec, +} + +#[Object] +impl AllServicesBuilt { + async fn payload(&self) -> &Vec { + &self.payload + } +} diff --git a/src/main.rs b/src/main.rs index d539936..438630e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,9 +2,10 @@ use anyhow::Error; use clap::{arg, Command}; use superviseur::{ cmd::{ - config::execute_config, init::execute_init, list::execute_list, log::execute_log, - new::execute_new, ps::execute_ps, restart::execute_restart, start::execute_start, - status::execute_status, stop::execute_stop, tail::execute_tail, ui::execute_ui, + build::execute_build, config::execute_config, init::execute_init, list::execute_list, + log::execute_log, new::execute_new, ps::execute_ps, restart::execute_restart, + start::execute_start, status::execute_status, stop::execute_stop, tail::execute_tail, + ui::execute_ui, }, server, types::configuration::ConfigFormat, @@ -89,6 +90,9 @@ A simple process supervisor"#, .subcommand(Command::new("up").about("Start all services")) .subcommand(Command::new("down").about("Stop all services")) .subcommand(Command::new("ui").about("Start the superviseur ui")) + .subcommand(Command::new("build") + .arg(arg!([name] "The name of the service to build, if not specified, all services will be built")) + .about("Build all services or a specific one")) } #[tokio::main] @@ -147,6 +151,10 @@ async fn main() -> Result<(), Error> { Some(("up", _)) => execute_start(None).await?, Some(("down", _)) => execute_stop(None).await?, Some(("ui", _)) => execute_ui().await?, + Some(("build", args)) => { + let name = args.value_of("name"); + execute_build(name).await?; + } _ => cli().print_help()?, } Ok(()) diff --git a/src/server/control.rs b/src/server/control.rs index a46c6f8..cb39796 100644 --- a/src/server/control.rs +++ b/src/server/control.rs @@ -1,6 +1,5 @@ use std::{ collections::HashMap, - path::Path, sync::{Arc, Mutex}, thread, time::Duration, @@ -15,10 +14,10 @@ use crate::{ api::{ objects::v1alpha1::Service, superviseur::v1alpha1::{ - control_service_server::ControlService, ListRequest, ListResponse, - ListRunningProcessesRequest, ListRunningProcessesResponse, LoadConfigRequest, - LoadConfigResponse, RestartRequest, RestartResponse, StartRequest, StartResponse, - StatusRequest, StatusResponse, StopRequest, StopResponse, + control_service_server::ControlService, BuildRequest, BuildResponse, ListRequest, + ListResponse, ListRunningProcessesRequest, ListRunningProcessesResponse, + LoadConfigRequest, LoadConfigResponse, RestartRequest, RestartResponse, StartRequest, + StartResponse, StatusRequest, StatusResponse, StopRequest, StopResponse, }, }, superviseur::core::{ProcessEvent, Superviseur, SuperviseurCommand}, @@ -401,4 +400,46 @@ impl ControlService for Control { }; Ok(Response::new(list_response)) } + + async fn build( + &self, + request: Request, + ) -> Result, tonic::Status> { + let request = request.into_inner(); + let path = request.config_file_path; + let name = request.name; + let config_map = self.config_map.lock().unwrap(); + + if !config_map.contains_key(&path) { + return Err(tonic::Status::not_found("Config file not found")); + } + + let config = config_map.get(&path).unwrap(); + + if name.len() > 0 { + let service = config + .services + .iter() + .find(|s| s.name == name) + .ok_or_else(|| tonic::Status::not_found("Service not found"))?; + + service + .build + .as_ref() + .ok_or_else(|| tonic::Status::invalid_argument("Service has no build command"))?; + + self.cmd_tx + .send(SuperviseurCommand::Build( + service.clone(), + config.project.clone(), + )) + .map_err(|e| tonic::Status::internal(e.to_string()))?; + return Ok(Response::new(BuildResponse { success: true })); + } + + self.cmd_tx + .send(SuperviseurCommand::BuildAll(config.project.clone())) + .map_err(|e| tonic::Status::internal(e.to_string()))?; + Ok(Response::new(BuildResponse { success: true })) + } } diff --git a/src/server/core.rs b/src/server/core.rs index c69d9a6..63af5d2 100644 --- a/src/server/core.rs +++ b/src/server/core.rs @@ -1,6 +1,5 @@ use std::{ collections::HashMap, - error::Error, sync::{Arc, Mutex}, thread, }; diff --git a/src/superviseur/core.rs b/src/superviseur/core.rs index cfb7c9e..20a3502 100644 --- a/src/superviseur/core.rs +++ b/src/superviseur/core.rs @@ -16,8 +16,9 @@ use crate::{ schema::{ self, objects::subscriptions::{ - AllServicesRestarted, AllServicesStarted, AllServicesStopped, ServiceRestarted, - ServiceStarted, ServiceStarting, ServiceStopped, ServiceStopping, + AllServicesBuilt, AllServicesRestarted, AllServicesStarted, AllServicesStopped, + ServiceBuilding, ServiceBuilt, ServiceRestarted, ServiceStarted, ServiceStarting, + ServiceStopped, ServiceStopping, }, }, simple_broker::SimpleBroker, @@ -69,12 +70,14 @@ pub enum SuperviseurCommand { Start(Service, String), Stop(Service, String), Restart(Service, String), + Build(Service, String), LoadConfig(ConfigurationData, String), WatchForChanges(String, Service, String), StartDependency(Service, String), StartAll(String), StopAll(String), RestartAll(String), + BuildAll(String), } #[derive(Debug)] @@ -87,6 +90,9 @@ pub enum ProcessEvent { AllStarted(String), AllStopped(String), AllRestarted(String), + Building(String, String), + Built(String, String), + AllBuilt(String), } struct SuperviseurInternal { @@ -282,6 +288,27 @@ impl SuperviseurInternal { Ok(()) } + fn handle_build(&mut self, service: Service, project: String) -> Result<(), Error> { + self.event_tx + .send(ProcessEvent::Building( + service.name.clone(), + project.clone(), + )) + .unwrap(); + + let service_graph = self.service_graph.lock().unwrap(); + let graph = service_graph + .clone() + .into_iter() + .filter(|(_, key)| *key == project) + .map(|(s, _)| s) + .next() + .ok_or(anyhow::anyhow!("Project {} not found", project))?; + let mut visited = vec![false; graph.size()]; + graph.build_service(&service, &mut visited); + Ok(()) + } + fn handle_watch_for_changes( &mut self, dir: String, @@ -330,6 +357,19 @@ impl SuperviseurInternal { Ok(()) } + fn handle_build_all(&mut self, project: String) -> Result<(), Error> { + let service_graph = self.service_graph.lock().unwrap(); + let graph = service_graph + .clone() + .into_iter() + .filter(|(_, key)| *key == project) + .map(|(s, _)| s) + .next() + .ok_or(anyhow::anyhow!("Project {} not found", project))?; + graph.build_services(); + Ok(()) + } + fn handle_command(&mut self, cmd: SuperviseurCommand) -> Result<(), Error> { match cmd { SuperviseurCommand::Load(service, project) => self.handle_load(service, project), @@ -346,6 +386,8 @@ impl SuperviseurInternal { SuperviseurCommand::StartAll(project) => self.handle_start_all(project), SuperviseurCommand::StopAll(project) => self.handle_stop_all(project), SuperviseurCommand::RestartAll(project) => self.handle_restart_all(project), + SuperviseurCommand::Build(service, project) => self.handle_build(service, project), + SuperviseurCommand::BuildAll(project) => self.handle_build_all(project), } } @@ -450,6 +492,43 @@ impl SuperviseurInternal { process: process.into(), }); } + ProcessEvent::Building(service_name, project) => { + let mut process = &mut processes + .iter_mut() + .find(|(p, key)| p.name == service_name && key == &project) + .unwrap() + .0; + process.state = State::Building; + // call SimpleBroker::publish + let service = self.get_service(&service_name, &project)?; + let mut service = schema::objects::service::Service::from(&service); + service.status = String::from("BUILDING"); + SimpleBroker::publish(ServiceBuilding { + payload: service.clone(), + process: process.into(), + }); + } + ProcessEvent::Built(service_name, project) => { + let mut process = &mut processes + .iter_mut() + .find(|(p, key)| p.name == service_name && key == &project) + .unwrap() + .0; + process.state = State::Stopped; + // call SimpleBroker::publish + let service = self.get_service(&service_name, &project)?; + let mut service = schema::objects::service::Service::from(&service); + service.status = String::from("STOPPED"); + SimpleBroker::publish(ServiceBuilt { + payload: service.clone(), + process: process.into(), + }); + } + ProcessEvent::AllBuilt(project) => { + // call SimpleBroker::publish + let services = self.get_project_services(&project)?; + SimpleBroker::publish(AllServicesBuilt { payload: services }); + } } Ok(()) } diff --git a/src/superviseur/dependencies.rs b/src/superviseur/dependencies.rs index bd89bd8..0b75a2c 100644 --- a/src/superviseur/dependencies.rs +++ b/src/superviseur/dependencies.rs @@ -200,4 +200,33 @@ impl DependencyGraph { .stop(self.project.clone()) .unwrap(); } + + pub fn build_services(&self) { + let mut visited = vec![false; self.vertices.len()]; + for vertex in self.vertices.clone().into_iter() { + self.build_service(&vertex.into(), &mut visited); + } + } + + pub fn build_service(&self, service: &Service, visited: &mut Vec) { + let index = self + .vertices + .iter() + .position(|v| v.name == service.name) + .unwrap(); + if visited[index] { + return; + } + visited[index] = true; + for edge in self.edges.iter().filter(|e| e.from == index) { + let service = self.vertices[edge.to].clone().into(); + self.build_service(&service, visited); + } + + println!("Building service {}", self.vertices[index].name); + self.vertices[index] + .driver + .build(self.project.clone()) + .unwrap(); + } } diff --git a/src/superviseur/drivers/exec/driver.rs b/src/superviseur/drivers/exec/driver.rs index 5993d60..8b9acea 100644 --- a/src/superviseur/drivers/exec/driver.rs +++ b/src/superviseur/drivers/exec/driver.rs @@ -1,11 +1,13 @@ use std::{ collections::HashMap, io::{BufRead, Write}, + process::{ChildStderr, ChildStdout}, sync::{Arc, Mutex}, thread, }; use anyhow::Error; +use owo_colors::OwoColorize; use tokio::sync::mpsc; use crate::{ @@ -56,6 +58,42 @@ impl Driver { event_tx, } } + + pub fn write_logs(&self, stdout: ChildStdout, stderr: ChildStderr) { + let cloned_service = self.service.clone(); + + thread::spawn(move || { + let service = cloned_service; + let id = service.id.unwrap_or("-".to_string()); + // write stdout to file + let mut log_file = std::fs::File::create(service.stdout).unwrap(); + + let stdout = std::io::BufReader::new(stdout); + for line in stdout.lines() { + let line = line.unwrap(); + let line = format!("{}\n", line); + SimpleBroker::publish(TailLogStream { + id: id.clone(), + line: line.clone(), + }); + SimpleBroker::publish(LogStream { + id: id.clone(), + line: line.clone(), + }); + let service_name = format!("{} | ", service.name); + print!("{} {}", service_name.cyan(), line); + log_file.write_all(line.as_bytes()).unwrap(); + } + + // write stderr to file + let mut err_file = std::fs::File::create(service.stderr).unwrap(); + let stderr = std::io::BufReader::new(stderr); + for line in stderr.lines() { + let line = line.unwrap(); + err_file.write_all(line.as_bytes()).unwrap(); + } + }); + } } impl DriverPlugin for Driver { @@ -63,6 +101,7 @@ impl DriverPlugin for Driver { let command = &self.service.command; let envs = self.service.env.clone(); let working_dir = self.service.working_dir.clone(); + println!("command: {}", command); let mut child = std::process::Command::new("sh") .arg("-c") .arg(command) @@ -97,37 +136,7 @@ impl DriverPlugin for Driver { let stdout = child.stdout.take().unwrap(); let stderr = child.stderr.take().unwrap(); - let cloned_service = self.service.clone(); - - thread::spawn(move || { - let service = cloned_service; - let id = service.id.unwrap_or("-".to_string()); - // write stdout to file - let mut log_file = std::fs::File::create(service.stdout).unwrap(); - - let stdout = std::io::BufReader::new(stdout); - for line in stdout.lines() { - let line = line.unwrap(); - let line = format!("{}\n", line); - SimpleBroker::publish(TailLogStream { - id: id.clone(), - line: line.clone(), - }); - SimpleBroker::publish(LogStream { - id: id.clone(), - line: line.clone(), - }); - log_file.write_all(line.as_bytes()).unwrap(); - } - - // write stderr to file - let mut err_file = std::fs::File::create(service.stderr).unwrap(); - let stderr = std::io::BufReader::new(stderr); - for line in stderr.lines() { - let line = line.unwrap(); - err_file.write_all(line.as_bytes()).unwrap(); - } - }); + self.write_logs(stdout, stderr); Ok(()) } @@ -209,4 +218,33 @@ impl DriverPlugin for Driver { fn exec(&self) -> Result<(), Error> { Ok(()) } + + fn build(&self, project: String) -> Result<(), Error> { + if let Some(build) = self.service.build.clone() { + let envs = self.service.env.clone(); + let working_dir = self.service.working_dir.clone(); + + let build_command = build.command.clone(); + println!("build_command: {}", build_command); + let mut child = std::process::Command::new("sh") + .arg("-c") + .arg(build_command) + .current_dir(working_dir) + .envs(envs) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn()?; + let stdout = child.stdout.take().unwrap(); + let stderr = child.stderr.take().unwrap(); + self.write_logs(stdout, stderr); + + child.wait()?; + + self.event_tx.send(ProcessEvent::Built( + self.service.name.clone(), + project.clone(), + ))?; + } + Ok(()) + } } diff --git a/src/superviseur/drivers/flox/driver.rs b/src/superviseur/drivers/flox/driver.rs index 8cd0d4a..2c9ea13 100644 --- a/src/superviseur/drivers/flox/driver.rs +++ b/src/superviseur/drivers/flox/driver.rs @@ -1,11 +1,13 @@ use std::{ collections::HashMap, io::{BufRead, Write}, + process::{ChildStderr, ChildStdout}, sync::{Arc, Mutex}, thread, }; use anyhow::Error; +use owo_colors::OwoColorize; use tokio::sync::mpsc; use crate::{ @@ -59,11 +61,8 @@ impl Driver { event_tx, } } -} -impl DriverPlugin for Driver { - fn start(&self, project: String) -> Result<(), Error> { - let cfg = self.service.flox.as_ref().unwrap(); + pub fn setup_flox_env(&self, cfg: &Flox) -> Result<(), Error> { std::process::Command::new("sh") .arg("-c") .arg("flox --version") @@ -81,9 +80,52 @@ impl DriverPlugin for Driver { .arg(command) .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()) - .spawn() - .unwrap(); - child.wait().unwrap(); + .spawn()?; + child.wait()?; + Ok(()) + } + + pub fn write_logs(&self, stdout: ChildStdout, stderr: ChildStderr) { + let cloned_service = self.service.clone(); + + thread::spawn(move || { + let service = cloned_service; + let id = service.id.unwrap_or("-".to_string()); + // write stdout to file + let mut log_file = std::fs::File::create(service.stdout).unwrap(); + + let stdout = std::io::BufReader::new(stdout); + for line in stdout.lines() { + let line = line.unwrap(); + let line = format!("{}\n", line); + SimpleBroker::publish(TailLogStream { + id: id.clone(), + line: line.clone(), + }); + SimpleBroker::publish(LogStream { + id: id.clone(), + line: line.clone(), + }); + let service_name = format!("{} | ", service.name); + print!("{} {}", service_name.cyan(), line); + log_file.write_all(line.as_bytes()).unwrap(); + } + + // write stderr to file + let mut err_file = std::fs::File::create(service.stderr).unwrap(); + let stderr = std::io::BufReader::new(stderr); + for line in stderr.lines() { + let line = line.unwrap(); + err_file.write_all(line.as_bytes()).unwrap(); + } + }); + } +} + +impl DriverPlugin for Driver { + fn start(&self, project: String) -> Result<(), Error> { + let cfg = self.service.flox.as_ref().unwrap(); + self.setup_flox_env(cfg)?; let command = format!( "flox activate -e {} -- {}", @@ -100,8 +142,7 @@ impl DriverPlugin for Driver { .envs(envs) .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()) - .spawn() - .unwrap(); + .spawn()?; let mut processes = self.processes.lock().unwrap(); @@ -118,47 +159,14 @@ impl DriverPlugin for Driver { .unwrap() .insert(service_key, process.pid.unwrap() as i32); - self.event_tx - .send(ProcessEvent::Started( - self.service.name.clone(), - project.clone(), - )) - .unwrap(); + self.event_tx.send(ProcessEvent::Started( + self.service.name.clone(), + project.clone(), + ))?; let stdout = child.stdout.take().unwrap(); let stderr = child.stderr.take().unwrap(); - let cloned_service = self.service.clone(); - - thread::spawn(move || { - let service = cloned_service; - let id = service.id.unwrap_or("-".to_string()); - // write stdout to file - let mut log_file = std::fs::File::create(service.stdout).unwrap(); - - let stdout = std::io::BufReader::new(stdout); - for line in stdout.lines() { - let line = line.unwrap(); - let line = format!("{}\n", line); - SimpleBroker::publish(TailLogStream { - id: id.clone(), - line: line.clone(), - }); - SimpleBroker::publish(LogStream { - id: id.clone(), - line: line.clone(), - }); - log_file.write_all(line.as_bytes()).unwrap(); - } - - // write stderr to file - let mut err_file = std::fs::File::create(service.stderr).unwrap(); - let stderr = std::io::BufReader::new(stderr); - for line in stderr.lines() { - let line = line.unwrap(); - err_file.write_all(line.as_bytes()).unwrap(); - } - }); - + self.write_logs(stdout, stderr); Ok(()) } @@ -241,4 +249,37 @@ impl DriverPlugin for Driver { fn exec(&self) -> Result<(), Error> { Ok(()) } + + fn build(&self, project: String) -> Result<(), Error> { + if let Some(build) = self.service.build.clone() { + let envs = self.service.env.clone(); + let working_dir = self.service.working_dir.clone(); + let cfg = self.service.flox.as_ref().unwrap(); + self.setup_flox_env(cfg)?; + + let build_command = + format!("flox activate -e {} -- {}", cfg.environment, build.command); + println!("build_command: {}", build_command); + let mut child = std::process::Command::new("sh") + .arg("-c") + .arg(build_command) + .current_dir(working_dir) + .envs(envs) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn()?; + let stdout = child.stdout.take().unwrap(); + let stderr = child.stderr.take().unwrap(); + self.write_logs(stdout, stderr); + child.wait()?; + + self.event_tx + .send(ProcessEvent::Built( + self.service.name.clone(), + project.clone(), + )) + .unwrap(); + } + Ok(()) + } } diff --git a/src/superviseur/drivers/mod.rs b/src/superviseur/drivers/mod.rs index b4e1bd7..43257fe 100644 --- a/src/superviseur/drivers/mod.rs +++ b/src/superviseur/drivers/mod.rs @@ -11,4 +11,5 @@ pub trait DriverPlugin: DynClone { fn status(&self) -> Result<(), Error>; fn logs(&self) -> Result<(), Error>; fn exec(&self) -> Result<(), Error>; + fn build(&self, project: String) -> Result<(), Error>; } diff --git a/src/types/configuration.rs b/src/types/configuration.rs index 9ef3396..f6a6f09 100644 --- a/src/types/configuration.rs +++ b/src/types/configuration.rs @@ -39,6 +39,8 @@ pub struct Service { pub wait_for: Option>, #[serde(skip_serializing_if = "Option::is_none")] pub flox: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub build: Option, } #[derive(Serialize, Deserialize, Debug, Default, Clone)] @@ -51,3 +53,8 @@ pub struct ConfigurationData { pub struct Flox { pub environment: String, } + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Build { + pub command: String, +} diff --git a/src/types/process.rs b/src/types/process.rs index d956edb..7f1750e 100644 --- a/src/types/process.rs +++ b/src/types/process.rs @@ -22,6 +22,7 @@ pub enum State { Locked, WaitingForCpu, Unknown, + Building, } impl Default for State { @@ -71,6 +72,7 @@ impl Display for State { State::Locked => write!(f, "Locked"), State::WaitingForCpu => write!(f, "WaitingForCpu"), State::Unknown => write!(f, "Unknown"), + State::Building => write!(f, "Building"), } } }