Skip to content

Commit

Permalink
fix: fix blocking on plugin at online
Browse files Browse the repository at this point in the history
  • Loading branch information
SkuldNorniern committed May 7, 2024
1 parent 49a0c43 commit 085918c
Showing 1 changed file with 42 additions and 14 deletions.
56 changes: 42 additions & 14 deletions src/net/online_fluereflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@ pub async fn packet_capture(arg: Args) -> Result<(), FluereError> {

let mut records: Vec<FluereRecord> = Vec::new();
let mut active_flow: HashMap<Key, FluereRecord> = HashMap::new();
let mut tasks = vec![];
let mut export_tasks = vec![];
// let mut packet_count = 0;

// let export_rt =
loop {
match cap.next_packet() {
Err(_) => {
Expand Down Expand Up @@ -187,29 +189,50 @@ pub async fn packet_capture(arg: Args) -> Result<(), FluereError> {
// Export flows if the interval has been reached
if last_export.elapsed() >= Duration::from_millis(interval) && interval != 0 {
let mut expired_flows = vec![];
let mut expired_flow_data = vec![];
// packet_count = 0;
debug!("Calculating timeout start");
for (key, flow) in active_flow.iter() {
if flow_timeout > 0 && flow.last < (time - (flow_timeout * 1000)) {
trace!("flow expired");
trace!("flow data: {:?}", flow);

plugin_manager.process_flow_data(*flow).await.unwrap();
// plugin_manager.process_flow_data(*flow).await.unwrap();
records.push(*flow);
expired_flows.push(*key);
expired_flow_data.push(*flow);
}
}

// Send the expired flows to the plugins
debug!(
"Sending {} expired flows to plugins start",
expired_flows.len()
);

let cloned_plugin_manager = plugin_manager.clone();

Check failure on line 213 in src/net/online_fluereflow.rs

View workflow job for this annotation

GitHub Actions / clippy

no method named `clone` found for struct `fluere_plugin::PluginManager` in the current scope

error[E0599]: no method named `clone` found for struct `fluere_plugin::PluginManager` in the current scope --> src/net/online_fluereflow.rs:213:64 | 213 | let cloned_plugin_manager = plugin_manager.clone(); | ^^^^^ method not found in `PluginManager`
tasks.push(task::spawn(async move {
for flow in &expired_flow_data {
cloned_plugin_manager.process_flow_data(*flow).await.unwrap();
}
debug!("Sending {} expired flows to plugins done", expired_flow_data.len());
}));

active_flow.retain(|key, _| !expired_flows.contains(key));
let cloned_records = records.clone();
records.clear();
debug!("Calculating timeout done");

//let file_path_clone = file_path.clone();
let file_path_clone = file_path.clone();
//let file = fs::File::create(file_path_clone).unwrap();
let tasks = task::spawn(async {
fluere_exporter(cloned_records, file);
});
info!("Export {} Started", file_path_clone);
export_tasks.push(task::spawn(async move {
fluere_exporter(cloned_records, file).await;
info!("Export {} Finished", file_path_clone);
}));

let result = tasks.await;
info!("Export {} result: {:?}", file_path, result);
// let result = tasks.await;
info!("running without blockng");
file_path = cur_time_file(csv_file.as_str(), file_dir, ".csv");
file = fs::File::create(file_path.as_ref()).unwrap();
last_export = Instant::now();
Expand Down Expand Up @@ -237,15 +260,20 @@ pub async fn packet_capture(arg: Args) -> Result<(), FluereError> {
plugin_manager.process_flow_data(*flow).await.unwrap();
records.push(*flow);
}
let cloned_records = records.clone();
let tasks = task::spawn(async {
fluere_exporter(cloned_records, file);
});
for task in tasks {
let _ = task.await;
}

let cloned_records = records.clone();
export_tasks.push(task::spawn(async {
fluere_exporter(cloned_records, file).await;
}));
plugin_manager.await_completion(plugin_worker).await;
drop(plugin_manager);
let result = tasks.await;
info!("Exporting task excutation result: {:?}", result);
for task in export_tasks {
let _ = task.await;
}
// info!("Exporting task excutation result: {:?}", result);

Ok(())
}

0 comments on commit 085918c

Please sign in to comment.