Skip to content

Commit

Permalink
Adding --tail for top
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuajerin committed Mar 3, 2024
1 parent 46a6408 commit 0670107
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 83 deletions.
170 changes: 88 additions & 82 deletions tembo-cli/src/cmd/top.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::cli::context::Profile;
use crate::cli::context::{get_current_context, Environment};
use crate::cli::tembo_config::InstanceSettings;
use crate::cmd::apply::get_instance_settings;
Expand All @@ -19,7 +20,10 @@ use tokio::runtime::Runtime;
use tokio::time::Duration;

#[derive(Args)]
pub struct TopCommand {}
pub struct TopCommand {
#[clap(long)]
pub tail: bool,
}

//Format to display the response. Will be changed in beautify.
#[derive(Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -72,13 +76,10 @@ async fn fetch_metrics_loop(
config: &Configuration,
env: Environment,
instance_settings: HashMap<String, InstanceSettings>,
profile: &Profile,
) -> Result<()> {
let mut stdout = stdout();
let client = reqwest::Client::new();
let profile = env
.selected_profile
.as_ref()
.context("Expected environment to have a selected profile")?;
let url = profile.tembo_data_host.clone();

let mut headers = HeaderMap::new();
Expand All @@ -89,17 +90,16 @@ async fn fetch_metrics_loop(
.expect("JWT Token is not configured");
headers.insert("Authorization", format!("Bearer {}", jwt_token).parse()?);

loop {
execute!(stdout, Clear(ClearType::All))?;
execute!(stdout, Clear(ClearType::All))?;

for value in instance_settings.values() {
let org_name = get_instance_org_name(config, &env, &value.instance_name).await?;
let namespace = format!("org-{}-inst-{}", org_name, &value.instance_name);
let namespace_encoded = urlencoding::encode(&namespace);
for value in instance_settings.values() {
let org_name = get_instance_org_name(config, &env, &value.instance_name).await?;
let namespace = format!("org-{}-inst-{}", org_name, &value.instance_name);
let namespace_encoded = urlencoding::encode(&namespace);

println!("Instance: {}", &value.instance_name);
println!("Instance: {}", &value.instance_name);

let metric_queries = vec![
let metric_queries = vec![
(
"Cpu",
format!("sum by (pod) (node_namespace_pod_container:container_cpu_usage_seconds_total:sum_irate{{ namespace=\"{}\", container=\"postgres\" }}) / avg by (pod) (kube_pod_container_resource_limits{{ job=\"kube-state-metrics\", namespace=\"{}\", container=\"postgres\", resource=\"cpu\" }})*100", namespace_encoded, namespace_encoded),
Expand Down Expand Up @@ -135,69 +135,66 @@ async fn fetch_metrics_loop(
),*/
];

for (query_name, query1, query2) in &metric_queries {
let result1 =
fetch_metric(query1, &namespace_encoded, &client, &headers, &url).await;
let result2 =
fetch_metric(query2, &namespace_encoded, &client, &headers, &url).await;
for (query_name, query1, query2) in &metric_queries {
let result1 = fetch_metric(query1, &namespace_encoded, &client, &headers, &url).await;
let result2 = fetch_metric(query2, &namespace_encoded, &client, &headers, &url).await;

match (result1, result2) {
(Ok(metrics_response1), Ok(metrics_response2)) => {
let raw_value1: f64 = match metrics_response1.data.result.first() {
Some(metric_result) => match metric_result.value.1.parse::<f64>() {
Ok(parsed_value) => parsed_value,
Err(_) => {
eprintln!(
"Error parsing value for {}: defaulting to 0.0",
query_name
);
0.0
}
},
None => {
eprintln!("No result found for {}: defaulting to 0.0", query_name);
match (result1, result2) {
(Ok(metrics_response1), Ok(metrics_response2)) => {
let raw_value1: f64 = match metrics_response1.data.result.first() {
Some(metric_result) => match metric_result.value.1.parse::<f64>() {
Ok(parsed_value) => parsed_value,
Err(_) => {
eprintln!(
"Error parsing value for {}: defaulting to 0.0",
query_name
);
0.0
}
};
let raw_value2: f64 = match metrics_response2.data.result.first() {
Some(metric_result) => match metric_result.value.1.parse::<f64>() {
Ok(parsed_value) => parsed_value,
Err(_) => {
eprintln!(
"Error parsing value for {}: defaulting to 0.0",
query_name
);
0.0
}
},
None => {
eprintln!("No result found for {}: defaulting to 0.0", query_name);
},
None => {
eprintln!("No result found for {}: defaulting to 0.0", query_name);
0.0
}
};
let raw_value2: f64 = match metrics_response2.data.result.first() {
Some(metric_result) => match metric_result.value.1.parse::<f64>() {
Ok(parsed_value) => parsed_value,
Err(_) => {
eprintln!(
"Error parsing value for {}: defaulting to 0.0",
query_name
);
0.0
}
};
},
None => {
eprintln!("No result found for {}: defaulting to 0.0", query_name);
0.0
}
};

let value1 = format!("{:.2}", raw_value1.abs());
let value1 = format!("{:.2}", raw_value1.abs());

if *query_name == "Storage" || *query_name == "Memory" {
let value2 = format!("{:.2}", raw_value2.abs());
println!("{}: {} | {}%", query_name, value2, value1);
} else {
let value2 = format!("{}", raw_value2.abs());
println!("{}: {} | {}%", query_name, value2, value1);
}
}
(Err(e), _) | (_, Err(e)) => {
eprintln!("Error fetching metrics for {}: {}", query_name, e);
if *query_name == "Storage" || *query_name == "Memory" {
let value2 = format!("{:.2}", raw_value2.abs());
println!("{}: {} | {}%", query_name, value2, value1);
} else {
let value2 = format!("{}", raw_value2.abs());
println!("{}: {} | {}%", query_name, value2, value1);
}
}
(Err(e), _) | (_, Err(e)) => {
eprintln!("Error fetching metrics for {}: {}", query_name, e);
}
}

println!();
}

stdout.flush()?;
tokio::time::sleep(Duration::from_secs(2)).await;
println!();
}

stdout.flush()?;
Ok(())
}

async fn fetch_metric(
Expand Down Expand Up @@ -254,24 +251,7 @@ async fn get_instance_org_name(
}
}

//Function to tackle async
fn blocking(config: &Configuration, env: &Environment) -> Result<(), anyhow::Error> {
let rt = match Runtime::new() {
Ok(rt) => rt,
Err(e) => return Err(anyhow!("Failed to create Tokio runtime: {}", e)),
};

let instance_settings = get_instance_settings(None, None)?;
rt.block_on(async {
match fetch_metrics_loop(config, env.clone(), instance_settings).await {
Ok(_) => (),
Err(e) => eprintln!("Error fetching metrics: {}", e),
}
});
Ok(())
}

pub fn execute(verbose: bool) -> Result<(), anyhow::Error> {
pub fn execute(verbose: bool, top_command: TopCommand) -> Result<(), anyhow::Error> {
println!("WARNING! EXPERIMENTAL FEATURE!!");
super::validate::execute(verbose)?;
let env = get_current_context().context("Failed to get current context")?;
Expand All @@ -284,6 +264,32 @@ pub fn execute(verbose: bool) -> Result<(), anyhow::Error> {
bearer_access_token: Some(profile.tembo_access_token.clone()),
..Default::default()
};
let _result = blocking(&config, &env);
let instance_settings = get_instance_settings(None, None)?;

//Looking for --tail here
if top_command.tail {
let rt = Runtime::new().map_err(|e| anyhow!("Failed to create Tokio runtime: {}", e))?;
rt.block_on(async {
loop {
if let Err(e) =
fetch_metrics_loop(&config, env.clone(), instance_settings.clone(), profile)
.await
{
eprintln!("Error fetching metrics: {}", e);
}
tokio::time::sleep(Duration::from_secs(2)).await;
}
});
} else {
let rt = Runtime::new().map_err(|e| anyhow!("Failed to create Tokio runtime: {}", e))?;
rt.block_on(async {
if let Err(e) =
fetch_metrics_loop(&config, env.clone(), instance_settings.clone(), profile).await
{
eprintln!("Error fetching metrics: {}", e);
}
});
}

Ok(())
}
2 changes: 1 addition & 1 deletion tembo-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ fn main() -> Result<(), anyhow::Error> {
login::execute()?;
}
SubCommands::Top(_top_cmd) => {
top::execute(app.global_opts.verbose)?;
top::execute(app.global_opts.verbose, _top_cmd)?;
}
}

Expand Down

0 comments on commit 0670107

Please sign in to comment.