Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

182 eldritch sleep blocks imix agent #208

Merged
merged 7 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion implants/golem/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ fn main() -> anyhow::Result<()> {
tome_files_and_content.push( (tome_path, tome_contents) )
}

let runtime = tokio::runtime::Builder::new_current_thread()
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
Expand Down
138 changes: 97 additions & 41 deletions implants/imix/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,12 @@ async fn handle_exec_tome(task: GraphQLTask, print_channel_sender: Sender<String

let print_handler = EldritchPrintHandler{ sender: print_channel_sender };

println!("{:?}",task_job.parameters);
// Execute a tome script
let res = match thread::spawn(move || { eldritch_run(tome_name, tome_contents, task_job.parameters, &print_handler) }).join() {
Ok(local_thread_res) => local_thread_res,
Err(_) => todo!(),
};

// let res = eldritch_run(tome_name, tome_contents, task_job.parameters, &print_handler);
match res {
Ok(tome_output) => Ok((tome_output, "".to_string())),
Err(tome_error) => Ok(("".to_string(), tome_error.to_string())),
Expand All @@ -69,7 +68,6 @@ async fn handle_exec_timeout_and_response(task: graphql::GraphQLTask, print_chan

// Define a future for our execution task
let exec_future = handle_exec_tome(task.clone(), print_channel_sender.clone());

// Execute that future with a timeout defined by the timeout argument.
let tome_result = match tokio::time::timeout(timeout_duration, exec_future).await {
Ok(res) => {
Expand All @@ -81,6 +79,10 @@ async fn handle_exec_timeout_and_response(task: graphql::GraphQLTask, print_chan
Err(timer_elapsed) => ("".to_string(), format!("Time elapsed task {} has been running for {} seconds", task.id, timer_elapsed.to_string())),
};

// let tome_result = tokio::task::spawn(exec_future).await??;
// let tome_result = tokio::spawn(exec_future).await??;


print_channel_sender.clone().send(format!("---[RESULT]----\n{}\n---------",tome_result.0))?;
print_channel_sender.clone().send(format!("---[ERROR]----\n{}\n--------",tome_result.1))?;
Ok(())
Expand Down Expand Up @@ -130,7 +132,7 @@ fn get_primary_ip() -> Result<String> {
}
},
Err(e) => {
println!("Error getting primary ip address:\n{e}");
eprintln!("Error getting primary ip address:\n{e}");
"DANGER-UNKNOWN".to_string()
},
};
Expand Down Expand Up @@ -166,7 +168,7 @@ fn get_os_pretty_name() -> Result<String> {

// Async handler for port scanning.
async fn main_loop(config_path: String, run_once: bool) -> Result<()> {
let debug = true;
let debug = false;
let version_string = "v0.1.0";
let config_file = File::open(config_path)?;
let imix_config: imix::Config = serde_json::from_reader(config_file)?;
Expand Down Expand Up @@ -234,71 +236,74 @@ async fn main_loop(config_path: String, run_once: bool) -> Result<()> {
};

loop {
let start_time = Utc::now().time();
// 0. Get loop start time
let loop_start_time = Instant::now();
if debug { println!("Get new tasks"); }
// 1. Pull down new tasks
// 1a) calculate callback uri
let cur_callback_uri = imix_config.callback_config.c2_configs[0].uri.clone();

if debug { println!("[{}]: collecting tasks", (Utc::now().time() - start_time).num_milliseconds()) }
// 1b) Collect new tasks
let new_tasks = match graphql::gql_claim_tasks(cur_callback_uri.clone(), claim_tasks_input.clone()).await {
Ok(tasks) => tasks,
Err(error) => {
if debug {
println!("main_loop: error claiming task\n{:?}", error)
}
if debug { println!("main_loop: error claiming task\n{:?}", error) }
let empty_vec = vec![];
empty_vec
},
};

if debug { println!("Starting {} new tasks", new_tasks.len()); }
if debug { println!("[{}]: Starting {} new tasks", (Utc::now().time() - start_time).num_milliseconds(), new_tasks.len()); }
// 2. Start new tasks
for task in new_tasks {
if debug { println!("Launching:\n{:?}", task.clone().job.unwrap().tome.eldritch); }

let (sender, receiver) = channel::<String>();
let exec_with_timeout = handle_exec_timeout_and_response(task.clone(), sender.clone());
if debug { println!("[{}]: Queueing task {}", (Utc::now().time() - start_time).num_milliseconds(), task.clone().id); }
match all_exec_futures.insert(task.clone().id, ExecTask{
future_join_handle: task::spawn(exec_with_timeout),
start_time: Utc::now(),
graphql_task: task.clone(),
print_reciever: receiver,
}) {
Some(_old_task) => {
if debug {
println!("main_loop: error adding new task. Non-unique taskID\n");
}
if debug {println!("main_loop: error adding new task. Non-unique taskID\n");}
},
None => {}, // Task queued successfully
None => {
if debug {println!("main_loop: Task queued successfully\n");}
}, // Task queued successfully
}
if debug { println!("[{}]: Queued task {}", (Utc::now().time() - start_time).num_milliseconds(), task.clone().id); }
}

if debug { println!("Sleeping"); }
// 3. Sleep till callback time
// time_to_wait - time_elapsed
let time_to_sleep = imix_config.callback_config.interval - loop_start_time.elapsed().as_secs() ;
tokio::time::sleep(std::time::Duration::new(time_to_sleep, 24601)).await;

let time_to_sleep = imix_config.callback_config.interval - loop_start_time.elapsed().as_secs();
if debug { println!("[{}]: Sleeping seconds {}", (Utc::now().time() - start_time).num_milliseconds(), time_to_sleep); }
// tokio::time::sleep(std::time::Duration::new(time_to_sleep, 24601)).await; // This seems to wait for other threads to finish.
std::thread::sleep(std::time::Duration::new(time_to_sleep, 24601)); // This just sleeps our thread.

// :clap: :clap: make new map!
let mut running_exec_futures: HashMap<String, ExecTask> = HashMap::new();

if debug { println!("Checking status"); }
if debug { println!("[{}]: Checking task status", (Utc::now().time() - start_time).num_milliseconds()); }
// Check status & send response
for exec_future in all_exec_futures.into_iter() {
if debug {
println!("{}: {:?}", exec_future.0, exec_future.1.future_join_handle.is_finished());
}
if debug { println!("[{}]: Task # {} is_finished? {}", (Utc::now().time() - start_time).num_milliseconds(), exec_future.0, exec_future.1.future_join_handle.is_finished()); }
let mut res: Vec<String> = vec![];
// Loop over each line of output from the task.
loop {
if debug { println!("Reciveing output"); }
if debug { println!("[{}]: Task # {} recieving output", (Utc::now().time() - start_time).num_milliseconds(), exec_future.0); }
let new_res_line = match exec_future.1.print_reciever.recv_timeout(Duration::from_millis(100)) {
Ok(local_res_string) => local_res_string,
Ok(local_res_string) => {
local_res_string
},
Err(local_err) => {
match local_err.to_string().as_str() {
"channel is empty and sending half is closed" => { break; },
"timed out waiting on channel" => { break; },
_ => eprint!("Error: {}", local_err),
}
break;
Expand Down Expand Up @@ -328,16 +333,14 @@ async fn main_loop(config_path: String, run_once: bool) -> Result<()> {
}
},
};
if debug {
println!("{}", task_response.output);
}
if debug { println!("[{}]: Task {} output: {}", (Utc::now().time() - start_time).num_milliseconds(), exec_future.0, task_response.output); }
let submit_task_result = graphql::gql_post_task_result(cur_callback_uri.clone(), task_response).await;
let _ = match submit_task_result {
Ok(_) => Ok(()), // Currently no reason to save the task since it's the task we just answered.
Err(error) => Err(error),
};

// Only re-insert the runnine exec futures
// Only re-insert the runnine exec futures
if !exec_future.1.future_join_handle.is_finished() {
running_exec_futures.insert(exec_future.0, exec_future.1);
}
Expand All @@ -349,7 +352,6 @@ async fn main_loop(config_path: String, run_once: bool) -> Result<()> {
}
}


pub fn main() -> Result<(), imix::Error> {
let matches = Command::new("imix")
.arg(
Expand All @@ -371,7 +373,7 @@ pub fn main() -> Result<(), imix::Error> {
.get_matches();


let runtime = tokio::runtime::Builder::new_current_thread()
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
Expand All @@ -393,7 +395,7 @@ pub fn main() -> Result<(), imix::Error> {
if let Some(config_path) = matches.value_of("config") {
match runtime.block_on(main_loop(config_path.to_string(), false)) {
Ok(_) => {},
Err(error) => println!("Imix mail_loop exited unexpectedly with config: {}\n{}", config_path.to_string(), error),
Err(error) => eprintln!("Imix main_loop exited unexpectedly with config: {}\n{}", config_path.to_string(), error),
}
}
Ok(())
Expand All @@ -414,7 +416,6 @@ mod tests {
let primary_ip_address = match get_primary_ip() {
Ok(local_primary_ip) => local_primary_ip,
Err(local_error) => {
println!("An error occured during testing default_ip:{local_error}");
assert_eq!(false,true);
"DANGER-UNKNOWN".to_string()
},
Expand All @@ -425,7 +426,6 @@ mod tests {
#[test]
fn imix_test_get_os_pretty_name() {
let res = get_os_pretty_name().unwrap();
println!("{res}");
assert!(!res.contains("UNKNOWN"));
}

Expand Down Expand Up @@ -453,7 +453,7 @@ sys.shell(input_params["cmd"])
};


let runtime = tokio::runtime::Builder::new_current_thread()
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
Expand All @@ -467,7 +467,6 @@ sys.shell(input_params["cmd"])
let stdout = receiver.recv_timeout(Duration::from_millis(500)).unwrap();
assert_eq!(stdout, "custom_print_handler_test".to_string());

println!("{:?}", result.clone());
let mut bool_res = false;

if cfg!(target_os = "linux") ||
Expand All @@ -487,6 +486,65 @@ sys.shell(input_params["cmd"])

}


#[test]
fn imix_test_main_loop_sleep_twice_short() -> Result<()> {
// Response expectations are poped in reverse order.
let server = Server::run();
server.expect(
Expectation::matching(all_of![
request::method_path("POST", "/graphql"),
request::body(matches(".*ImixPostResult.*main_loop_test_success.*"))
])
.times(2)
.respond_with(status_code(200)
.body(r#"{"data":{"submitTaskResult":{"id":"17179869185"}}}"#)),
);
server.expect(
Expectation::matching(all_of![
request::method_path("POST", "/graphql"),
request::body(matches(".*claimTasks.*"))
])
.times(1)
.respond_with(status_code(200)
.body(r#"{"data":{"claimTasks":[{"id":"17179869185","job":{"id":"4294967297","name":"Sleep1","parameters":"{}","tome":{"id":"21474836482","name":"sleep","description":"sleep stuff","paramDefs":"{}","eldritch":"def test():\n if sys.is_macos():\n sys.shell(\"sleep 3\")\n if sys.is_linux():\n sys.shell(\"sleep 3\")\n if sys.is_windows():\n sys.shell(\"timeout 3\")\ntest()\nprint(\"main_loop_test_success\")","files":[]},"bundle":null}},{"id":"17179869186","job":{"id":"4294967298","name":"Sleep1","parameters":"{}","tome":{"id":"21474836483","name":"sleep","description":"sleep stuff","paramDefs":"{}","eldritch":"def test():\n if sys.is_macos():\n sys.shell(\"sleep 3\")\n if sys.is_linux():\n sys.shell(\"sleep 3\")\n if sys.is_windows():\n sys.shell(\"timeout 3\")\ntest()\nprint(\"main_loop_test_success\")","files":[]},"bundle":null}}]}}"#)),
);

let tmp_file_new = NamedTempFile::new()?;
let path_new = String::from(tmp_file_new.path().to_str().unwrap()).clone();
let url = server.url("/graphql").to_string();
let _ = std::fs::write(path_new.clone(),format!(r#"{{
"service_configs": [],
"target_forward_connect_ip": "127.0.0.1",
"target_name": "test1234",
"callback_config": {{
"interval": 4,
"jitter": 0,
"timeout": 4,
"c2_configs": [
{{
"priority": 1,
"uri": "{url}"
}}
]
}}
}}"#));

let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();

// Define a future for our execution task
let start_time = Utc::now().time();
let exec_future = main_loop(path_new, true);
let _result = runtime.block_on(exec_future).unwrap();
let end_time = Utc::now().time();
let diff = (end_time - start_time).num_milliseconds();
assert!(diff < 4500);
Ok(())
}

#[test]
fn imix_test_main_loop_run_once() -> Result<()> {

Expand Down Expand Up @@ -519,7 +577,7 @@ sys.shell(input_params["cmd"])
"target_forward_connect_ip": "127.0.0.1",
"target_name": "test1234",
"callback_config": {{
"interval": 8,
"interval": 4,
"jitter": 1,
"timeout": 4,
"c2_configs": [
Expand All @@ -531,20 +589,18 @@ sys.shell(input_params["cmd"])
}}
}}"#));

let runtime = tokio::runtime::Builder::new_current_thread()
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();

// let (sender, receiver) = channel::<String>();

// // Define a future for our execution task
// let exec_future = handle_exec_tome(test_tome_input, sender.clone())
let exec_future = main_loop(path_new, true);
let _result = runtime.block_on(exec_future).unwrap();

assert!(true);
Ok(())
}


}