From 610dbefdf06bf66e98852bd71ab1922e6775c0e6 Mon Sep 17 00:00:00 2001 From: akitaSummer Date: Thu, 25 Jan 2024 01:51:52 +0800 Subject: [PATCH] fix: update uds --- integration/fixtures/utils/pids.js | 37 ++++----- integration/index.2.test.js | 31 ++++--- package.json | 2 +- packages/cli/lib/deamon.js | 10 +-- packages/deamon/Cargo.toml | 6 +- packages/deamon/src/config.rs | 102 +++++++++++++++++++---- packages/deamon/src/main.rs | 23 +++-- packages/deamon/src/pid.rs | 54 ++++++++---- packages/deamon/src/server.rs | 129 +++++++++++++++++++++-------- 9 files changed, 275 insertions(+), 119 deletions(-) diff --git a/integration/fixtures/utils/pids.js b/integration/fixtures/utils/pids.js index dfe6af7..4e41408 100644 --- a/integration/fixtures/utils/pids.js +++ b/integration/fixtures/utils/pids.js @@ -20,32 +20,25 @@ class Pids { try { const snapshot = await this.getPsSnapshot(); - - let overlayPattern; - - let nfsPattern; - - if (process.platform === 'linux') { - overlayPattern = new RegExp(`overlay.*?${this.nodeModulesDir}`, 'i'); - } else if (process.platform === 'darwin') { - overlayPattern = new RegExp(`unionfs.*?${this.nodeModulesDir}`, 'i'); - nfsPattern = new RegExp( + console.log(snapshot); + if (process.platform === 'darwin') { + const overlayPattern = new RegExp(`unionfs.*?${this.nodeModulesDir}`, 'i'); + const nfsPattern = new RegExp( `/usr/local/bin/go-nfsv4.*?${this.nodeModulesDir}`, 'i' ); - } - console.log('snapshot', snapshot); - - for (const line of snapshot.split('\n')) { - if (overlayPattern.test(line)) { - const fields = line.split(/\s+/); - if (fields.length >= 11) { - const pid = parseInt(fields[1], 10) || 0; - pids.push(pid); + for (const line of snapshot.split('\n')) { + if (overlayPattern.test(line)) { + const fields = line.split(/\s+/); + console.log(fields); + if (fields.length >= 11) { + const pid = parseInt(fields[1], 10) || 0; + pids.push(pid); + } } - } - if (process.platform === 'darwin') { + if (nfsPattern.test(line)) { const fields = line.split(/\s+/); + console.log(fields); if (fields.length >= 11) { const pid = parseInt(fields[1], 10) || 0; pids.push(pid); @@ -61,4 +54,4 @@ class Pids { } } -exports.Pids = Pids; \ No newline at end of file +exports.Pids = Pids; diff --git a/integration/index.2.test.js b/integration/index.2.test.js index 9abc55a..5c37939 100644 --- a/integration/index.2.test.js +++ b/integration/index.2.test.js @@ -6,7 +6,7 @@ const assert = require('node:assert'); const coffee = require('coffee'); const semver = require('semver'); const execa = require('execa'); -const { setTimeout } = require('node:timers/promises'); +const { setTimeout: setTimeoutPromise } = require('node:timers/promises'); const rapid = path.join(__dirname, '../node_modules/.bin/rapid'); const { Pids } = require(path.join(__dirname, './fixtures/utils/pids')); const { @@ -225,19 +225,24 @@ describe('test/index.v2.test.js', () => { await assert.doesNotReject(fs.stat(path.join(cwd, 'node_modules/esbuild'))); assert.strictEqual(require(path.join(cwd, 'node_modules', 'esbuild/package.json')).version, '0.15.14'); - const pidsInstance = new Pids('esbuild/node_modules'); - let pids = await pidsInstance.getPids(); - assert(pids.length > 0); - for (const pid of pids) { - await execa.command(`kill -9 ${pid}`); + if (process.platform === 'linux') { + const nodeModulesDir = path.join(cwd, 'node_modules'); + await execa.command(`umount -f ${nodeModulesDir}`); + await setTimeoutPromise(10000); } - await new Promise(resolve => { - setTimeout(() => { - resolve(); - }, 10000); - }); - pids = await pidsInstance.getPids(); - assert(pids.length > 0); + + if (process.platform === 'darwin') { + const pidsInstance = new Pids('esbuild/node_modules'); + let pids = await pidsInstance.getPids(); + assert(pids.length > 0); + for (const pid of pids) { + await execa.command(`kill -9 ${pid}`); + } + await setTimeoutPromise(10000); + pids = await pidsInstance.getPids(); + assert(pids.length > 0); + } + assert.strictEqual(require(path.join(cwd, 'node_modules', 'esbuild/package.json')).version, '0.15.14'); }); }); diff --git a/package.json b/package.json index 562c2c0..7428ec7 100644 --- a/package.json +++ b/package.json @@ -29,7 +29,6 @@ "version": "lerna version --conventional-commits --sign-git-commit --sign-git-tag --no-push --no-private" }, "devDependencies": { - "lerna": "^7.1.4", "@eggjs/tsconfig": "^1.0.0", "@types/mocha": "^8.2.0", "@types/node": "^18.16.3", @@ -38,6 +37,7 @@ "eslint-config-egg": "^12.0.0", "espower-typescript": "^9.0.2", "intelli-espower-loader": "^1.0.1", + "lerna": "^7.1.4", "mm": "^2.2.0", "mocha": "^8.2.1", "nyc": "^15.1.0", diff --git a/packages/cli/lib/deamon.js b/packages/cli/lib/deamon.js index edaf92d..61bd8c2 100644 --- a/packages/cli/lib/deamon.js +++ b/packages/cli/lib/deamon.js @@ -19,7 +19,7 @@ const deamonDir = path.join(baseRapidModeDir(), 'project'); const metadataDir = path.join(deamonDir, 'metadata'); -// const deamonSocketPath = path.join(deamonDir, 'socket_path'); +const deamonSocketPath = path.join(deamonDir, 'socket_path'); const rapidDeamon = rsBindingPath ? path.join(rsBindingPath, 'rapid_deamon') @@ -38,7 +38,7 @@ const checkDeamonAlive = async () => { try { const result = await urllib.request(`${aliveUrl}`, { method: 'GET', - // socketPath: deamonSocketPath, + socketPath: deamonSocketPath, timeout: 1000, }); return result.status === 200; @@ -73,7 +73,7 @@ const delProject = async projectName => { data: { projectPath: config.projectPath }, dataType: 'json', contentType: 'json', - // socketPath: deamonSocketPath, + socketPath: deamonSocketPath, }); return result.status === 200 && result.data?.code === 0; } catch (error) { @@ -91,7 +91,7 @@ const addProject = async config => { data: config, dataType: 'json', contentType: 'json', - // socketPath: deamonSocketPath, + socketPath: deamonSocketPath, }); return result.status === 200 && result.data?.code === 0; } catch (_) { @@ -124,7 +124,7 @@ const killDeamon = async () => { try { const result = await urllib.request(`${killUrl}`, { method: 'GET', - // socketPath: deamonSocketPath, + socketPath: deamonSocketPath, }); return result.status === 200; } catch (_) { diff --git a/packages/deamon/Cargo.toml b/packages/deamon/Cargo.toml index 137d1c4..62a33ab 100644 --- a/packages/deamon/Cargo.toml +++ b/packages/deamon/Cargo.toml @@ -20,15 +20,17 @@ log4rs = "1" axum = "0.7" http-body-util = "0.1" hyper = { version = "0.14", features = ["full"] } +hyper_v1 = { package = "hyper", version = "1", features = ["full"] } hyperlocal = "0.8" -hyper-util = { version = "0.1.2", features = [ +hyper-util = { version = "0.1", features = [ "client", "client-legacy", "http1", + "server-auto", "tokio", ] } httparse = "1.8" +tower = { version = "0.4", features = ["util"] } [dev-dependencies] -tower = { version = "0.4", features = ["util"] } mime = "0.3" diff --git a/packages/deamon/src/config.rs b/packages/deamon/src/config.rs index 5497628..39dfbbd 100644 --- a/packages/deamon/src/config.rs +++ b/packages/deamon/src/config.rs @@ -147,14 +147,12 @@ pub struct Overlay { } impl Overlay { + #[cfg(target_os = "macos")] pub fn get_pids(&self) -> Result> { let mut pids = vec![]; let snapshot = get_ps_snapshot()?; - #[cfg(target_os = "linux")] - let overlay_pattern = Regex::new(&format!(r#"(?i)overlay.*?{}"#, self.node_modules_dir))?; - #[cfg(target_os = "macos")] let unionfs = match &self.unionfs { Some(s) => s, @@ -165,7 +163,7 @@ impl Overlay { )) } }; - #[cfg(target_os = "macos")] + let overlay_pattern = Regex::new(&format!(r#"(?i){}.*?{}"#, unionfs, self.node_modules_dir))?; @@ -183,13 +181,11 @@ impl Overlay { } } - #[cfg(target_os = "macos")] let nfs_pattern = Regex::new(&format!( r#"(?i)/usr/local/bin/go-nfsv4.*?{}"#, self.node_modules_dir ))?; - #[cfg(target_os = "macos")] for line in snapshot.clone().lines() { if nfs_pattern.is_match(line) { let fields: Vec<&str> = line.split_whitespace().collect(); @@ -206,14 +202,18 @@ impl Overlay { Ok(pids) } + + #[cfg(target_os = "linux")] pub fn restart(&self) -> Result> { - #[cfg(target_os = "linux")] + let unmount_modules_str = format!(r#"umount -f {}"#, self.node_modules_dir); + + let _ = start_command(&unmount_modules_str); + + let unmount_overlay_str = format!(r#"umount -f {}"#, self.overlay); + + let _ = start_command(&unmount_overlay_str); + let tmp_str = format!(r#"mount -t tmpfs tmpfs {}"#, self.overlay); - #[cfg(target_os = "macos")] - let tmp_str = format!( - r#"hdiutil attach -nobrowse -mountpoint {} {}"#, - self.overlay, self.tmp_dmg - ); match start_command(&tmp_str) { Ok(output) => { @@ -241,7 +241,6 @@ impl Overlay { } } - #[cfg(target_os = "linux")] let workdir = match &self.workdir { Some(s) => s, None => { @@ -251,12 +250,72 @@ impl Overlay { )) } }; - #[cfg(target_os = "linux")] + let mount_str = format!( r#"mount -t overlay overlay -o lowerdir={},upperdir={},workdir={} {}"#, self.mnt, self.upper, workdir, self.node_modules_dir ); - #[cfg(target_os = "macos")] + match start_command(&mount_str) { + Ok(output) => { + if output.status.success() { + info!( + "Overlay restart executed successfully, mountpoint: {:?}", + self.node_modules_dir + ); + } else { + return Err(anyhow!( + "Error executing Overlay restart: {:?}, mountpoint: {:?}", + output.status, + self.node_modules_dir + )); + } + } + Err(e) => { + return Err(anyhow!( + "Error executing Overlay restart command: {:?}, mountpoint: {:?}", + e, + self.node_modules_dir + )); + } + } + + let res = vec![]; + Ok(res) + } + + #[cfg(target_os = "macos")] + pub fn restart(&self) -> Result> { + let tmp_str = format!( + r#"hdiutil attach -nobrowse -mountpoint {} {}"#, + self.overlay, self.tmp_dmg + ); + + match start_command(&tmp_str) { + Ok(output) => { + if output.status.success() { + info!( + "Overlay restart executed successfully, mountpoint: {:?}, tmp_str: {:?}", + self.node_modules_dir, tmp_str + ); + } else { + return Err(anyhow!( + "Error executing Overlay restart: {:?}, mountpoint: {:?}, tmp_str: {:?}", + output.status, + self.node_modules_dir, + tmp_str + )); + } + } + Err(e) => { + return Err(anyhow!( + "Error executing Overlay restart command: {:?}, mountpoint: {:?}, tmp_str: {:?}", + e, + self.node_modules_dir, + tmp_str + )); + } + } + let unionfs = match &self.unionfs { Some(s) => s, None => { @@ -266,7 +325,7 @@ impl Overlay { )) } }; - #[cfg(target_os = "macos")] + let mount_str = format!( r#"{} -o cow,max_files=32768 -o allow_other,use_ino,suid,dev,nobrowse {}=RW:{}=RO {}"#, unionfs, self.upper, self.mnt, self.node_modules_dir @@ -296,7 +355,8 @@ impl Overlay { } } - Ok(self.get_pids()?) + let res = self.get_pids()?; + Ok(res) } } @@ -373,6 +433,7 @@ impl ProjectConfig { pub fn get_pids(&self) -> Result> { let mut pids = vec![]; + #[cfg(target_os = "macos")] for overlay in self.overlays.iter() { let ps = overlay.get_pids()?; pids.extend(ps); @@ -472,7 +533,12 @@ impl NydusConfig { info!("init_daemon executed successfully"); return Ok(()); } else { - error!("Error executing init_daemon: {:?}", output.status); + error!( + "Error executing init_daemon, status: {:?}, stdout: {:?}, stderr: {:?}", + output.status, + std::str::from_utf8(&output.stdout)?, + std::str::from_utf8(&output.stderr)?, + ); } } Err(e) => error!("Error executing init_daemon: {:?}", e), diff --git a/packages/deamon/src/main.rs b/packages/deamon/src/main.rs index 29fb34e..0ed3558 100644 --- a/packages/deamon/src/main.rs +++ b/packages/deamon/src/main.rs @@ -4,6 +4,7 @@ extern crate lazy_static; use anyhow::Result; use config::{process_json_files_in_folder, NydusConfig}; use homedir::get_my_home; +use log::error; use pid::{check_projects, init_projects}; use server::start_server; use tokio::{select, sync::mpsc}; @@ -46,15 +47,21 @@ async fn main() { .unwrap() .join(".rapid/cache/project/metadata/"); - let _ = setup_logger(); - create_folder_if_not_exists(metadata_dir.to_str().unwrap()).unwrap(); + let socket_path = get_my_home() + .unwrap() + .unwrap() + .join(".rapid/cache/project/socket_path"); + + let _ = setup_logger(); + let configs = process_json_files_in_folder(metadata_dir.to_str().unwrap()) .await .unwrap(); let nydus = NydusConfig::new().await; + let _ = nydus.init_daemon(); let project_tree = init_projects(configs).await.unwrap(); @@ -66,20 +73,22 @@ async fn main() { std::thread::spawn(move || { tokio::runtime::Runtime::new() .unwrap() - .block_on(start_server(project_tree.clone(), sender)); + .block_on(start_server(project_tree.clone(), sender, socket_path)); }); } - println!("deamon main is ready"); - loop { select! { _ = receiver.recv() => { return; } _ = tokio::time::sleep(tokio::time::Duration::from_secs(5)) => { - let _ = nydus.init_daemon(); - let _ = check_projects(project_tree.clone()).await; + if let Err(e) = nydus.init_daemon() { + error!("init_daemon err: {}", e); + }; + if let Err(e) = check_projects(project_tree.clone()).await { + error!("check_projects err: {}", e); + } } } } diff --git a/packages/deamon/src/pid.rs b/packages/deamon/src/pid.rs index d3bf9b1..6a8e480 100644 --- a/packages/deamon/src/pid.rs +++ b/packages/deamon/src/pid.rs @@ -1,11 +1,12 @@ use super::config::ProjectConfig; -use anyhow::Result; +use anyhow::{anyhow, Result}; use log::{debug, error, info}; use nix::sys::signal::{self, Signal}; use nix::unistd; use std::sync::Arc; use std::{collections::BTreeMap, process::Command}; use tokio::{ + fs::read_dir, sync::Mutex, time::{timeout, Duration}, }; @@ -17,7 +18,7 @@ pub struct ProjectInfo { } impl ProjectInfo { - pub fn kill_pids(&mut self) { + pub fn kill_pids(&mut self) -> Result<()> { for pid in self.pids.iter() { info!( "{:?} be killing, project path {}", @@ -33,15 +34,21 @@ impl ProjectInfo { ); } Err(err) => { - error!("Failed to kill process with PID {}. Error: {:?}", pid, err); + return Err(anyhow!( + "Failed to kill process with PID {}. Error: {:?}", + pid, + err + )); } } } self.pids = vec![]; + + Ok(()) } - async fn restart(&mut self) { + async fn restart(&mut self) -> Result<()> { match self.config.restart().await { Ok(pids) => { info!( @@ -50,13 +57,14 @@ impl ProjectInfo { pids ); self.pids = pids; + return Ok(()); } Err(err) => { - error!( + return Err(anyhow!( "Failed to restart project path {}. Error: {:?}", self.config.get_project_path(), err - ); + )); } }; } @@ -79,12 +87,14 @@ pub async fn init_projects( Ok(lock_map) } -pub async fn kill_projects(process_map: Arc>>) { +pub async fn kill_projects(process_map: Arc>>) -> Result<()> { let mut map = process_map.lock().await; for (_, info) in map.iter_mut() { - info.kill_pids(); + info.kill_pids()?; } + + Ok(()) } pub async fn init_project( @@ -125,7 +135,9 @@ pub async fn check_projects(process_map: Arc for project in p_map.iter_mut() { info!("{:?} be checked", project.0); - let _ = check_project(project.1).await; + if let Err(e) = check_project(project.1).await { + error!("project {} err: {}", project.0, e); + } } Ok(()) @@ -137,12 +149,16 @@ async fn check_project(project: &mut ProjectInfo) -> Result<()> { project.config.get_project_path(), &project.pids ); + + #[cfg(target_os = "macos")] if (&project.pids).len() == 0 { - project.restart().await; - } else if !is_alive(&project.pids, project.config.get_project_path()).await { + project.restart().await?; + return Ok(()); + } + if !is_alive(&project.pids, project.config.get_project_path()).await { info!("{:?} will be check", project.config.get_project_path()); - project.kill_pids(); - project.restart().await; + project.kill_pids()?; + project.restart().await?; } Ok(()) @@ -180,10 +196,14 @@ async fn is_alive(pids: &Vec, directory_path: &str) -> bool { } } -async fn read_directory(path: &str) -> Result> { - let entries = std::fs::read_dir(path)? - .filter_map(|entry| entry.ok()) // Ignore potential errors - .collect(); +async fn read_directory(path: &str) -> Result> { + let mut dir = read_dir(path).await?; + + let mut entries = vec![]; + + while let Some(entry) = dir.next_entry().await? { + entries.push(entry); + } Ok(entries) } diff --git a/packages/deamon/src/server.rs b/packages/deamon/src/server.rs index ef5dd23..2093554 100644 --- a/packages/deamon/src/server.rs +++ b/packages/deamon/src/server.rs @@ -1,16 +1,25 @@ -use std::{collections::BTreeMap, sync::Arc}; +use std::{collections::BTreeMap, convert::Infallible, path::PathBuf, sync::Arc}; use anyhow::Result; use axum::{ - extract::State, - http::StatusCode, + extract::{connect_info, State}, + http::{Request, StatusCode}, response::Json, routing::{get, post}, Router, }; -use log::info; +use hyper_util::{ + rt::{TokioExecutor, TokioIo}, + server, +}; +use hyper_v1::body::Incoming; +use log::{error, info}; use serde::{Deserialize, Serialize}; -use tokio::sync::{mpsc::Sender, Mutex}; +use tokio::{ + net::{unix::UCred, UnixListener, UnixStream}, + sync::{mpsc::Sender, Mutex}, +}; +use tower::Service; use crate::{ config::ProjectConfig, @@ -25,7 +34,7 @@ struct Res { struct RouteState { pub project_tree: Arc>>, - pub sender: Sender, + pub sender: Arc>>, } async fn handle_add( @@ -63,7 +72,7 @@ async fn handle_del( info!("handle del project path is {}", request.project_path); if let Some(mut project) = project_tree.remove(&request.project_path) { - project.kill_pids(); + // project.kill_pids(); } Ok(Json(Res { @@ -85,14 +94,23 @@ async fn handle_kill( ) -> Result, (StatusCode, String)> { info!("echo kill"); let state = state.lock().await; - let _ = state.sender.send(0).await; - Ok(Json(Res { - code: 0, - msg: format!("deamon is alive"), - })) + let sender = state.sender.lock().await; + match sender.send(0).await { + Ok(_) => Ok(Json(Res { + code: 0, + msg: format!("deamon be killed"), + })), + Err(e) => Ok(Json(Res { + code: -1, + msg: format!("deamon kill err {:?}", e), + })), + } } -fn app(project_tree: Arc>>, sender: Sender) -> Router { +fn app( + project_tree: Arc>>, + sender: Arc>>, +) -> Router { let state = RouteState { project_tree, sender, @@ -105,37 +123,76 @@ fn app(project_tree: Arc>>, sender: Sender, + peer_cred: UCred, +} + +impl connect_info::Connected<&UnixStream> for UdsConnectInfo { + fn connect_info(target: &UnixStream) -> Self { + let peer_addr = target.peer_addr().unwrap(); + let peer_cred = target.peer_cred().unwrap(); + + Self { + peer_addr: Arc::new(peer_addr), + peer_cred, + } + } +} + +fn unwrap_infallible(result: Result) -> T { + match result { + Ok(value) => value, + Err(err) => { + error!("unwrap_infallible err {:?}", err); + match err {} + } + } +} + pub async fn start_server( project_tree: Arc>>, sender: Sender, + socket_path: PathBuf, ) { - let app = app(project_tree, sender); - - let mut port = 33889; + let sender = Arc::new(Mutex::new(sender)); + let app = app(project_tree, sender.clone()); - const MAX_RETRIES: usize = 100; - let mut retry_count = 0; + let _ = tokio::fs::remove_file(&socket_path).await; - let listener = loop { - match tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)).await { - Ok(listener) => { - info!("deamon server is ready on port {}", port); - break listener; - } - Err(_) => { - info!("Port {} is already in use, trying the next one", port); - port += 1; - retry_count += 1; - if retry_count >= MAX_RETRIES { - panic!("Exceeded maximum retry limit"); - } - } + let uds = match UnixListener::bind(socket_path.clone()) { + Ok(uds) => uds, + Err(e) => { + error!("create uds error: {:?}", e); + sender.lock().await.send(0).await; + return; } }; - println!("deamon server is ready"); + let mut make_service = app.into_make_service_with_connect_info::(); + + loop { + let (socket, _remote_addr) = uds.accept().await.unwrap(); + + let tower_service = unwrap_infallible(make_service.call(&socket).await); - axum::serve(listener, app).await.expect("Server failed"); + tokio::spawn(async move { + let socket = TokioIo::new(socket); + + let hyper_service = hyper_v1::service::service_fn(move |request: Request| { + tower_service.clone().call(request) + }); + + if let Err(err) = server::conn::auto::Builder::new(TokioExecutor::new()) + .serve_connection_with_upgrades(socket, hyper_service) + .await + { + eprintln!("failed to serve connection: {err:#}"); + } + }); + } } #[cfg(test)] @@ -179,6 +236,7 @@ mod test { drop(tree_config); let (sender, _) = mpsc::channel::(1); + let sender = Arc::new(Mutex::new(sender)); let app = app(tree.clone(), sender); let response = app @@ -203,6 +261,7 @@ mod test { let tree = create_mock_tree().await; let (sender, _) = mpsc::channel::(1); + let sender = Arc::new(Mutex::new(sender)); let app = app(tree.clone(), sender); let response = app @@ -227,6 +286,7 @@ mod test { let tree = create_mock_tree().await; let (sender, _) = mpsc::channel::(1); + let sender = Arc::new(Mutex::new(sender)); let app = app(tree.clone(), sender); let response = app @@ -251,6 +311,7 @@ mod test { let tree = create_mock_tree().await; let (sender, mut receiver) = mpsc::channel::(1); + let sender = Arc::new(Mutex::new(sender)); let app = app(tree.clone(), sender); let response = app