From 085918c43b7816d6f355c72aa0b607e8e81424ec Mon Sep 17 00:00:00 2001 From: SkuldNorniern Date: Tue, 7 May 2024 17:12:09 +0900 Subject: [PATCH] fix: fix blocking on plugin at `online` --- src/net/online_fluereflow.rs | 56 +++++++++++++++++++++++++++--------- 1 file changed, 42 insertions(+), 14 deletions(-) diff --git a/src/net/online_fluereflow.rs b/src/net/online_fluereflow.rs index 7e4de06..b4358b2 100644 --- a/src/net/online_fluereflow.rs +++ b/src/net/online_fluereflow.rs @@ -76,8 +76,10 @@ pub async fn packet_capture(arg: Args) -> Result<(), FluereError> { let mut records: Vec = Vec::new(); let mut active_flow: HashMap = HashMap::new(); + let mut tasks = vec![]; + let mut export_tasks = vec![]; // let mut packet_count = 0; - + // let export_rt = loop { match cap.next_packet() { Err(_) => { @@ -187,29 +189,50 @@ pub async fn packet_capture(arg: Args) -> Result<(), FluereError> { // Export flows if the interval has been reached if last_export.elapsed() >= Duration::from_millis(interval) && interval != 0 { let mut expired_flows = vec![]; + let mut expired_flow_data = vec![]; // packet_count = 0; + debug!("Calculating timeout start"); for (key, flow) in active_flow.iter() { if flow_timeout > 0 && flow.last < (time - (flow_timeout * 1000)) { trace!("flow expired"); trace!("flow data: {:?}", flow); - plugin_manager.process_flow_data(*flow).await.unwrap(); + // plugin_manager.process_flow_data(*flow).await.unwrap(); records.push(*flow); expired_flows.push(*key); + expired_flow_data.push(*flow); } } + + // Send the expired flows to the plugins + debug!( + "Sending {} expired flows to plugins start", + expired_flows.len() + ); + + let cloned_plugin_manager = plugin_manager.clone(); + tasks.push(task::spawn(async move { + for flow in &expired_flow_data { + cloned_plugin_manager.process_flow_data(*flow).await.unwrap(); + } + debug!("Sending {} expired flows to plugins done", expired_flow_data.len()); + })); + active_flow.retain(|key, _| !expired_flows.contains(key)); let cloned_records = records.clone(); records.clear(); + debug!("Calculating timeout done"); - //let file_path_clone = file_path.clone(); + let file_path_clone = file_path.clone(); //let file = fs::File::create(file_path_clone).unwrap(); - let tasks = task::spawn(async { - fluere_exporter(cloned_records, file); - }); + info!("Export {} Started", file_path_clone); + export_tasks.push(task::spawn(async move { + fluere_exporter(cloned_records, file).await; + info!("Export {} Finished", file_path_clone); + })); - let result = tasks.await; - info!("Export {} result: {:?}", file_path, result); + // let result = tasks.await; + info!("running without blockng"); file_path = cur_time_file(csv_file.as_str(), file_dir, ".csv"); file = fs::File::create(file_path.as_ref()).unwrap(); last_export = Instant::now(); @@ -237,15 +260,20 @@ pub async fn packet_capture(arg: Args) -> Result<(), FluereError> { plugin_manager.process_flow_data(*flow).await.unwrap(); records.push(*flow); } - let cloned_records = records.clone(); - let tasks = task::spawn(async { - fluere_exporter(cloned_records, file); - }); + for task in tasks { + let _ = task.await; + } + let cloned_records = records.clone(); + export_tasks.push(task::spawn(async { + fluere_exporter(cloned_records, file).await; + })); plugin_manager.await_completion(plugin_worker).await; drop(plugin_manager); - let result = tasks.await; - info!("Exporting task excutation result: {:?}", result); + for task in export_tasks { + let _ = task.await; + } + // info!("Exporting task excutation result: {:?}", result); Ok(()) }