diff --git a/src-tauri/config/conf-default.toml b/src-tauri/config/conf-default.toml index a819eac..9d640bc 100644 --- a/src-tauri/config/conf-default.toml +++ b/src-tauri/config/conf-default.toml @@ -1,4 +1,8 @@ [fw.app] id = "file-processor" name = "file-processor" -version = "0.1.0" \ No newline at end of file +version = "0.1.0" + +[cs] +[csm.processor] +concurrent = 4 diff --git a/src-tauri/src/main.rs b/src-tauri/src/main.rs index 55e1411..69ba58a 100644 --- a/src-tauri/src/main.rs +++ b/src-tauri/src/main.rs @@ -10,6 +10,7 @@ use tardis::config::config_dto::TardisConfig; #[cfg(debug_assertions)] use tardis::TardisFuns; use tardis::{basic::result::TardisResult, tokio}; +mod processor_config; mod tauri; mod uploader; diff --git a/src-tauri/src/processor_config.rs b/src-tauri/src/processor_config.rs new file mode 100644 index 0000000..e6d763f --- /dev/null +++ b/src-tauri/src/processor_config.rs @@ -0,0 +1,16 @@ +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; + +pub const DOMAIN_CODE: &str = "processor"; + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(default)] +pub struct ProcessorConfig { + pub concurrent: usize, +} + +impl Default for ProcessorConfig { + fn default() -> Self { + ProcessorConfig { concurrent: 5 } + } +} diff --git a/src-tauri/src/uploader.rs b/src-tauri/src/uploader.rs index 63f5692..bef8d34 100644 --- a/src-tauri/src/uploader.rs +++ b/src-tauri/src/uploader.rs @@ -27,7 +27,7 @@ use tardis::{ }; use tauri::{async_runtime::TokioJoinHandle, Emitter as _, Window}; -use crate::FileUploadProcessParams; +use crate::{processor_config::{ProcessorConfig, DOMAIN_CODE}, FileUploadProcessParams}; #[derive(Serialize, Deserialize, Clone)] pub struct UploadProgressResp { @@ -156,35 +156,48 @@ pub async fn upload_files( let base_path = origin_path.parent().unwrap_or(Path::new("")); let paths = async_get_files(&file_uri).await?; for path in paths { - let mime_type = mime_infer::from_path(path.clone()).first_or_text_plain(); - let file = File::open(path.clone()) - .await - .map_err(|e| TardisError::io_error(&format!("io error:{e}"), "error"))?; let relative_path = path .strip_prefix(&base_path) .map_err(|e| TardisError::io_error(&format!("io error:{e}"), "error"))?; - let size; - #[cfg(any(target_os = "macos", target_os = "linux"))] - { - size = file.metadata().await?.size(); - } - #[cfg(target_os = "windows")] - { - size = file.metadata().await?.file_size(); - } - let info = UploadFileInfo { - name: path - .file_name() - .and_then(|s| s.to_str()) - .unwrap_or_default() - .to_string(), - relative_path: relative_path.to_path_buf(), - size, - mime_type: mime_type.to_string(), - id: random::().to_string(), - }; + if path.is_file() { + let mime_type = mime_infer::from_path(path.clone()).first_or_text_plain(); + let file = File::open(path.clone()) + .await + .map_err(|e| TardisError::io_error(&format!("io error:{e}"), "error"))?; + let size; + #[cfg(any(target_os = "macos", target_os = "linux"))] + { + size = file.metadata().await?.size(); + } + #[cfg(target_os = "windows")] + { + size = file.metadata().await?.file_size(); + } + let info = UploadFileInfo { + name: path + .file_name() + .and_then(|s| s.to_str()) + .unwrap_or_default() + .to_string(), + relative_path: relative_path.to_path_buf(), + size, + mime_type: mime_type.to_string(), + id: random::().to_string(), + }; - files.push((file, info)); + files.push((Some(file), info)); + } else { + files.push(( + None, + UploadFileInfo { + name: "".to_string(), + relative_path: relative_path.to_path_buf(), + size: 0, + mime_type: "dir".to_string(), + id: random::().to_string(), + }, + )); + } } } total_file_numbers = files.len(); @@ -194,7 +207,6 @@ pub async fn upload_files( .await .into_iter() .sum(); - let back_task; if param.title.eq("请按使用文档调用(以下为示例)") { //mock @@ -219,7 +231,7 @@ pub async fn upload_files( } async fn mock_backend_task( - files: Vec<(File, UploadFileInfo)>, + files: Vec<(Option, UploadFileInfo)>, total_file_numbers: usize, total_file_size: u64, window: Window, @@ -287,9 +299,9 @@ async fn mock_backend_task( current_files: vec![], fail_files: vec![], success_files: if last_file.is_some() { - vec![] - } else { vec![last_file.unwrap()] + } else { + vec![] }, }, ) @@ -297,7 +309,7 @@ async fn mock_backend_task( } async fn backend_task( - files: Vec<(File, UploadFileInfo)>, + files: Vec<(Option, UploadFileInfo)>, total_file_numbers: usize, total_file_size: u64, window: Window, @@ -309,10 +321,10 @@ async fn backend_task( // first boolean means end(true)/start // seconde boolean is success(true)/fail let (tx, mut rx) = mpsc::channel(50); - let max_concurrent_tasks = 2; + let max_concurrent_tasks = TardisFuns::cs_config::(DOMAIN_CODE).concurrent; let semaphore = Arc::new(Semaphore::new(max_concurrent_tasks)); - for (mut file, info) in files { + for (file, info) in files { tardis::tokio::task::yield_now().await; let n_tx = tx.clone(); let config = config.clone(); @@ -322,36 +334,48 @@ async fn backend_task( let permit = semaphore.acquire_owned().await.unwrap(); let _ = n_tx.send(((false, false), info.clone())).await; let body = info.clone().to_body(&config).unwrap(); - info!("file====body:{}", body); - if let Ok(upload_metadata_result) = TardisFuns::web_client() - .post_obj_to_str( - config.upload_metadata_url, - &body, - config.upload_fixed_headers.unwrap_or_default(), - ) - .await - { - info!("upload_metadata_result=====:{:?}", upload_metadata_result); - if upload_metadata_result.code == 200 { - if let Some(upload_url) = upload_metadata_result.body { - info!("upload_url=====:{:?}", upload_url); - if reqwest::Url::parse(&upload_url).is_err() { - let _ = n_tx.send(((true, false), info.clone())).await; - return; - } + info!("file.body:{}", body); + if let Some(mut file) = file { + if let Ok(upload_metadata_result) = TardisFuns::web_client() + .post_obj_to_str( + config.upload_metadata_url, + &body, + config.upload_fixed_headers.unwrap_or_default(), + ) + .await + { + info!("upload_metadata_result:{:?}", upload_metadata_result); + if upload_metadata_result.code == 200 { + if let Some(upload_url) = upload_metadata_result.body { + info!("upload_url:{:?}", upload_url); + if reqwest::Url::parse(&upload_url).is_err() { + let _ = n_tx.send(((true, false), info.clone())).await; + return; + } - let mut content = vec![]; - let _ = file.read_to_end(&mut content).await; - let client = reqwest::Client::new(); - if let Ok(_) = client.put(upload_url).body(content).send().await { - let _ = n_tx.send(((true, true), info.clone())).await; - return; + let mut content = vec![]; + let _ = file.read_to_end(&mut content).await; + let client = reqwest::Client::new(); + if let Ok(_) = client.put(upload_url).body(content).send().await { + let _ = n_tx.send(((true, true), info.clone())).await; + return; + } } } - } - }; - let _ = n_tx.send(((true, false), info.clone())).await; - drop(permit); + }; + let _ = n_tx.send(((true, false), info.clone())).await; + drop(permit); + } else { + //empty dir + let _ = TardisFuns::web_client() + .post_obj_to_str( + config.upload_metadata_url, + &body, + config.upload_fixed_headers.unwrap_or_default(), + ) + .await; + let _ = n_tx.send(((true, true), info.clone())).await; + } }); } @@ -406,8 +430,12 @@ async fn backend_task( .unwrap(); } -async fn get_metadata_size(file: &File) -> u64 { - file.metadata().await.map(|md| md.len()).unwrap_or_default() +async fn get_metadata_size(file: &Option) -> u64 { + if let Some(file) = file { + file.metadata().await.map(|md| md.len()).unwrap_or_default() + } else { + 0 + } } async fn async_get_files(file_uri: &str) -> TardisResult> { @@ -417,13 +445,21 @@ async fn async_get_files(file_uri: &str) -> TardisResult> { result.push(path); } else { let mut dir = read_dir(file_uri).await.expect("can't open dir"); + result.push(path); + let mut push_dir = true; while let Some(d) = dir .next_entry() .await .map_err(|e| TardisError::io_error(&format!("io error:{e}"), "error"))? { match d.path().to_str() { - Some(path) => result.append(&mut Box::pin(async_get_files(path)).await?), + Some(path) => { + if push_dir { + result.remove(result.len() - 1); + push_dir = false + } + result.append(&mut Box::pin(async_get_files(path)).await?); + } None => continue, }; }