diff --git a/tembo-cli/src/cmd/top.rs b/tembo-cli/src/cmd/top.rs index 9129141b7..ef9213df6 100644 --- a/tembo-cli/src/cmd/top.rs +++ b/tembo-cli/src/cmd/top.rs @@ -1,3 +1,4 @@ +use crate::cli::context::Profile; use crate::cli::context::{get_current_context, Environment}; use crate::cli::tembo_config::InstanceSettings; use crate::cmd::apply::get_instance_settings; @@ -19,7 +20,10 @@ use tokio::runtime::Runtime; use tokio::time::Duration; #[derive(Args)] -pub struct TopCommand {} +pub struct TopCommand { + #[clap(long)] + pub tail: bool, +} //Format to display the response. Will be changed in beautify. #[derive(Serialize, Deserialize, Debug)] @@ -72,13 +76,10 @@ async fn fetch_metrics_loop( config: &Configuration, env: Environment, instance_settings: HashMap, + profile: &Profile, ) -> Result<()> { let mut stdout = stdout(); let client = reqwest::Client::new(); - let profile = env - .selected_profile - .as_ref() - .context("Expected environment to have a selected profile")?; let url = profile.tembo_data_host.clone(); let mut headers = HeaderMap::new(); @@ -89,17 +90,16 @@ async fn fetch_metrics_loop( .expect("JWT Token is not configured"); headers.insert("Authorization", format!("Bearer {}", jwt_token).parse()?); - loop { - execute!(stdout, Clear(ClearType::All))?; + execute!(stdout, Clear(ClearType::All))?; - for value in instance_settings.values() { - let org_name = get_instance_org_name(config, &env, &value.instance_name).await?; - let namespace = format!("org-{}-inst-{}", org_name, &value.instance_name); - let namespace_encoded = urlencoding::encode(&namespace); + for value in instance_settings.values() { + let org_name = get_instance_org_name(config, &env, &value.instance_name).await?; + let namespace = format!("org-{}-inst-{}", org_name, &value.instance_name); + let namespace_encoded = urlencoding::encode(&namespace); - println!("Instance: {}", &value.instance_name); + println!("Instance: {}", &value.instance_name); - let metric_queries = vec![ + let metric_queries = vec![ ( "Cpu", format!("sum by (pod) (node_namespace_pod_container:container_cpu_usage_seconds_total:sum_irate{{ namespace=\"{}\", container=\"postgres\" }}) / avg by (pod) (kube_pod_container_resource_limits{{ job=\"kube-state-metrics\", namespace=\"{}\", container=\"postgres\", resource=\"cpu\" }})*100", namespace_encoded, namespace_encoded), @@ -135,69 +135,66 @@ async fn fetch_metrics_loop( ),*/ ]; - for (query_name, query1, query2) in &metric_queries { - let result1 = - fetch_metric(query1, &namespace_encoded, &client, &headers, &url).await; - let result2 = - fetch_metric(query2, &namespace_encoded, &client, &headers, &url).await; + for (query_name, query1, query2) in &metric_queries { + let result1 = fetch_metric(query1, &namespace_encoded, &client, &headers, &url).await; + let result2 = fetch_metric(query2, &namespace_encoded, &client, &headers, &url).await; - match (result1, result2) { - (Ok(metrics_response1), Ok(metrics_response2)) => { - let raw_value1: f64 = match metrics_response1.data.result.first() { - Some(metric_result) => match metric_result.value.1.parse::() { - Ok(parsed_value) => parsed_value, - Err(_) => { - eprintln!( - "Error parsing value for {}: defaulting to 0.0", - query_name - ); - 0.0 - } - }, - None => { - eprintln!("No result found for {}: defaulting to 0.0", query_name); + match (result1, result2) { + (Ok(metrics_response1), Ok(metrics_response2)) => { + let raw_value1: f64 = match metrics_response1.data.result.first() { + Some(metric_result) => match metric_result.value.1.parse::() { + Ok(parsed_value) => parsed_value, + Err(_) => { + eprintln!( + "Error parsing value for {}: defaulting to 0.0", + query_name + ); 0.0 } - }; - let raw_value2: f64 = match metrics_response2.data.result.first() { - Some(metric_result) => match metric_result.value.1.parse::() { - Ok(parsed_value) => parsed_value, - Err(_) => { - eprintln!( - "Error parsing value for {}: defaulting to 0.0", - query_name - ); - 0.0 - } - }, - None => { - eprintln!("No result found for {}: defaulting to 0.0", query_name); + }, + None => { + eprintln!("No result found for {}: defaulting to 0.0", query_name); + 0.0 + } + }; + let raw_value2: f64 = match metrics_response2.data.result.first() { + Some(metric_result) => match metric_result.value.1.parse::() { + Ok(parsed_value) => parsed_value, + Err(_) => { + eprintln!( + "Error parsing value for {}: defaulting to 0.0", + query_name + ); 0.0 } - }; + }, + None => { + eprintln!("No result found for {}: defaulting to 0.0", query_name); + 0.0 + } + }; - let value1 = format!("{:.2}", raw_value1.abs()); + let value1 = format!("{:.2}", raw_value1.abs()); - if *query_name == "Storage" || *query_name == "Memory" { - let value2 = format!("{:.2}", raw_value2.abs()); - println!("{}: {} | {}%", query_name, value2, value1); - } else { - let value2 = format!("{}", raw_value2.abs()); - println!("{}: {} | {}%", query_name, value2, value1); - } - } - (Err(e), _) | (_, Err(e)) => { - eprintln!("Error fetching metrics for {}: {}", query_name, e); + if *query_name == "Storage" || *query_name == "Memory" { + let value2 = format!("{:.2}", raw_value2.abs()); + println!("{}: {} | {}%", query_name, value2, value1); + } else { + let value2 = format!("{}", raw_value2.abs()); + println!("{}: {} | {}%", query_name, value2, value1); } } + (Err(e), _) | (_, Err(e)) => { + eprintln!("Error fetching metrics for {}: {}", query_name, e); + } } - - println!(); } - stdout.flush()?; - tokio::time::sleep(Duration::from_secs(2)).await; + println!(); } + + stdout.flush()?; + Ok(()) } async fn fetch_metric( @@ -254,24 +251,7 @@ async fn get_instance_org_name( } } -//Function to tackle async -fn blocking(config: &Configuration, env: &Environment) -> Result<(), anyhow::Error> { - let rt = match Runtime::new() { - Ok(rt) => rt, - Err(e) => return Err(anyhow!("Failed to create Tokio runtime: {}", e)), - }; - - let instance_settings = get_instance_settings(None, None)?; - rt.block_on(async { - match fetch_metrics_loop(config, env.clone(), instance_settings).await { - Ok(_) => (), - Err(e) => eprintln!("Error fetching metrics: {}", e), - } - }); - Ok(()) -} - -pub fn execute(verbose: bool) -> Result<(), anyhow::Error> { +pub fn execute(verbose: bool, top_command: TopCommand) -> Result<(), anyhow::Error> { println!("WARNING! EXPERIMENTAL FEATURE!!"); super::validate::execute(verbose)?; let env = get_current_context().context("Failed to get current context")?; @@ -284,6 +264,32 @@ pub fn execute(verbose: bool) -> Result<(), anyhow::Error> { bearer_access_token: Some(profile.tembo_access_token.clone()), ..Default::default() }; - let _result = blocking(&config, &env); + let instance_settings = get_instance_settings(None, None)?; + + //Looking for --tail here + if top_command.tail { + let rt = Runtime::new().map_err(|e| anyhow!("Failed to create Tokio runtime: {}", e))?; + rt.block_on(async { + loop { + if let Err(e) = + fetch_metrics_loop(&config, env.clone(), instance_settings.clone(), profile) + .await + { + eprintln!("Error fetching metrics: {}", e); + } + tokio::time::sleep(Duration::from_secs(2)).await; + } + }); + } else { + let rt = Runtime::new().map_err(|e| anyhow!("Failed to create Tokio runtime: {}", e))?; + rt.block_on(async { + if let Err(e) = + fetch_metrics_loop(&config, env.clone(), instance_settings.clone(), profile).await + { + eprintln!("Error fetching metrics: {}", e); + } + }); + } + Ok(()) } diff --git a/tembo-cli/src/main.rs b/tembo-cli/src/main.rs index d83375602..e42acc526 100644 --- a/tembo-cli/src/main.rs +++ b/tembo-cli/src/main.rs @@ -78,7 +78,7 @@ fn main() -> Result<(), anyhow::Error> { login::execute()?; } SubCommands::Top(_top_cmd) => { - top::execute(app.global_opts.verbose)?; + top::execute(app.global_opts.verbose, _top_cmd)?; } }