From 05c8fe7f86ec02c940aafcaa55b8909709dce3d4 Mon Sep 17 00:00:00 2001 From: Hennzau Date: Sun, 5 Jan 2025 16:28:20 +0100 Subject: [PATCH] feat: simply fuse daemon and runtime commands without any modification --- zfctl/src/daemon_command.rs | 160 ++++++++++++++++++++++++++++++- zfctl/src/main.rs | 8 -- zfctl/src/runtime_command.rs | 180 ----------------------------------- 3 files changed, 156 insertions(+), 192 deletions(-) delete mode 100644 zfctl/src/runtime_command.rs diff --git a/zfctl/src/daemon_command.rs b/zfctl/src/daemon_command.rs index bd410947..6789cec7 100644 --- a/zfctl/src/daemon_command.rs +++ b/zfctl/src/daemon_command.rs @@ -12,17 +12,28 @@ // ZettaScale Zenoh Team, // -use std::path::PathBuf; +use std::{path::PathBuf, time::Duration}; +use anyhow::anyhow; use async_std::stream::StreamExt; use clap::{ArgGroup, Subcommand}; +use comfy_table::{Row, Table}; use signal_hook::consts::{SIGINT, SIGQUIT, SIGTERM}; use signal_hook_async_std::Signals; use zenoh::Session; -use zenoh_flow_commons::{try_parse_from_file, Result, Vars}; -use zenoh_flow_daemon::daemon::{Daemon, ZenohFlowConfiguration}; +use zenoh_flow_commons::{try_parse_from_file, Result, RuntimeId, Vars}; +use zenoh_flow_daemon::{ + daemon::{Daemon, ZenohFlowConfiguration}, + queries::{selector_runtimes, RuntimeStatus, RuntimesQuery}, +}; use zenoh_flow_runtime::Runtime; +use crate::{ + row, + utils::{get_all_runtimes, get_runtime_by_name}, + ZENOH_FLOW_INTERNAL_ERROR, +}; + #[derive(Subcommand)] pub(crate) enum DaemonCommand { /// Launch a Zenoh-Flow Daemon. @@ -53,10 +64,41 @@ pub(crate) enum DaemonCommand { #[arg(short = 'z', long, verbatim_doc_comment)] zenoh_configuration: Option, }, + /// List all the Zenoh-Flow runtimes reachable on the Zenoh network. + List, + /// Returns the status of the provided Zenoh-Flow runtime. + /// + /// The status consists of general information regarding the runtime and the + /// machine it runs on: + /// - the name associated with the Zenoh-Flow runtime, + /// - the number of CPUs the machine running the Zenoh-Flow runtime has, + /// - the total amount of RAM the machine running the Zenoh-Flow runtime has, + /// - for each data flow the Zenoh-Flow runtime manages (partially or not): + /// - its unique identifier, + /// - its name, + /// - its status. + #[command(verbatim_doc_comment)] + #[command(group( + ArgGroup::new("exclusive") + .args(&["runtime_id", "runtime_name"]) + .required(true) + .multiple(false) + ))] + Status { + /// The unique identifier of the Zenoh-Flow runtime to contact. + #[arg(short = 'i', long = "id")] + runtime_id: Option, + /// The name of the Zenoh-Flow runtime to contact. + /// + /// Note that if several runtimes share the same name, the first to + /// answer will be selected. + #[arg(short = 'n', long = "name")] + runtime_name: Option, + }, } impl DaemonCommand { - pub async fn run(self, _session: Session) -> Result<()> { + pub async fn run(self, session: Session) -> Result<()> { match self { DaemonCommand::Start { name, @@ -124,6 +166,116 @@ impl DaemonCommand { }) .await; } + DaemonCommand::List => { + let runtimes = get_all_runtimes(&session).await; + + let mut table = Table::new(); + table.set_width(80); + table.set_header(Row::from(vec!["Identifier", "Name"])); + runtimes.iter().for_each(|info| { + table.add_row(Row::from(vec![&info.id.to_string(), info.name.as_ref()])); + }); + + println!("{table}"); + } + DaemonCommand::Status { + runtime_id, + runtime_name, + } => { + let runtime_id = match (runtime_id, runtime_name) { + (Some(id), _) => id, + (None, Some(name)) => get_runtime_by_name(&session, &name).await, + (None, None) => { + // This code is indeed unreachable because: + // (1) The `group` macro has `required = true` which indicates that clap requires an entry for + // any group. + // (2) The `group` macro has `multiple = false` which indicates that only a single entry for + // any group is accepted. + // (3) The `runtime_id` and `runtime_name` fields belong to the same group "runtime". + // + // => A single entry for the group "runtime" is required (and mandatory). + unreachable!() + } + }; + + let selector = selector_runtimes(&runtime_id); + + let value = serde_json::to_vec(&RuntimesQuery::Status).map_err(|e| { + tracing::error!( + "serde_json failed to serialize `RuntimeQuery::Status`: {:?}", + e + ); + anyhow!(ZENOH_FLOW_INTERNAL_ERROR) + })?; + + let reply = session + .get(selector) + .payload(value) + .timeout(Duration::from_secs(5)) + .await + .map_err(|e| { + anyhow!( + "Failed to query Zenoh-Flow runtime < {} >: {:?}", + runtime_id, + e + ) + })?; + + while let Ok(reply) = reply.recv_async().await { + match reply.result() { + Ok(sample) => { + match serde_json::from_slice::( + &sample.payload().to_bytes(), + ) { + Ok(runtime_status) => { + let mut table = Table::new(); + table.set_width(80); + table.add_row(row!("Identifier", runtime_id)); + table.add_row(row!("Name", runtime_status.name)); + table.add_row(row!( + "Host name", + runtime_status.hostname.unwrap_or_else(|| "N/A".into()) + )); + table.add_row(row!( + "Operating System", + runtime_status + .operating_system + .unwrap_or_else(|| "N/A".into()) + )); + table.add_row(row!( + "Arch", + runtime_status.architecture.unwrap_or_else(|| "N/A".into()) + )); + table.add_row(row!("# CPUs", runtime_status.cpus)); + table.add_row(row!( + "# RAM", + bytesize::to_string(runtime_status.ram_total, true) + )); + println!("{table}"); + + table = Table::new(); + table.set_width(80); + table.set_header(row!("Instance Uuid", "Name", "Status")); + runtime_status.data_flows_status.iter().for_each( + |(uuid, (name, status))| { + table.add_row(row!(uuid, name, status)); + }, + ); + println!("{table}"); + } + Err(e) => { + tracing::error!( + "Failed to parse reply as a `RuntimeStatus`: {:?}", + e + ) + } + } + } + + Err(e) => tracing::error!("Reply to runtime status failed with: {:?}", e), + } + } + } } Ok(()) diff --git a/zfctl/src/main.rs b/zfctl/src/main.rs index 3a424abf..63a52b2b 100644 --- a/zfctl/src/main.rs +++ b/zfctl/src/main.rs @@ -15,9 +15,6 @@ mod instance_command; use instance_command::InstanceCommand; -mod runtime_command; -use runtime_command::RuntimeCommand; - mod daemon_command; use daemon_command::DaemonCommand; @@ -83,10 +80,6 @@ enum Command { runtime_name: Option, }, - /// To interact with a Zenoh-Flow runtime. - #[command(subcommand)] - Runtime(RuntimeCommand), - /// To interact with a Zenoh-Flow daemon. #[command(subcommand)] Daemon(DaemonCommand), @@ -154,7 +147,6 @@ async fn main() -> Result<()> { command.run(session, orchestrator_id).await } - Command::Runtime(command) => command.run(session).await, Command::Daemon(command) => command.run(session).await, Command::RunLocal { flow, diff --git a/zfctl/src/runtime_command.rs b/zfctl/src/runtime_command.rs deleted file mode 100644 index e4524c14..00000000 --- a/zfctl/src/runtime_command.rs +++ /dev/null @@ -1,180 +0,0 @@ -// -// Copyright © 2021 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -use std::time::Duration; - -use anyhow::anyhow; -use clap::Subcommand; -use comfy_table::{Row, Table}; -use zenoh::Session; -use zenoh_flow_commons::{Result, RuntimeId}; -use zenoh_flow_daemon::queries::*; - -use super::row; -use crate::{ - utils::{get_all_runtimes, get_runtime_by_name}, - ZENOH_FLOW_INTERNAL_ERROR, -}; - -#[derive(Subcommand)] -pub(crate) enum RuntimeCommand { - /// List all the Zenoh-Flow runtimes reachable on the Zenoh network. - List, - /// Returns the status of the provided Zenoh-Flow runtime. - /// - /// The status consists of general information regarding the runtime and the - /// machine it runs on: - /// - the name associated with the Zenoh-Flow runtime, - /// - the number of CPUs the machine running the Zenoh-Flow runtime has, - /// - the total amount of RAM the machine running the Zenoh-Flow runtime has, - /// - for each data flow the Zenoh-Flow runtime manages (partially or not): - /// - its unique identifier, - /// - its name, - /// - its status. - #[command(verbatim_doc_comment)] - #[group(required = true, multiple = false)] - Status { - /// The unique identifier of the Zenoh-Flow runtime to contact. - /// If no identifier is provided, a random Zenoh-Flow runtime is - /// selected among those reachable. - #[arg(short = 'i', long = "id", group = "runtime")] - runtime_id: Option, - /// The name of the Zenoh-Flow runtime to contact. - /// - /// Note that if several runtimes share the same name, the first to - /// answer will be selected. - #[arg(short = 'n', long = "name", group = "runtime")] - runtime_name: Option, - }, -} - -impl RuntimeCommand { - pub async fn run(self, session: Session) -> Result<()> { - match self { - RuntimeCommand::List => { - let runtimes = get_all_runtimes(&session).await; - - let mut table = Table::new(); - table.set_width(80); - table.set_header(Row::from(vec!["Identifier", "Name"])); - runtimes.iter().for_each(|info| { - table.add_row(Row::from(vec![&info.id.to_string(), info.name.as_ref()])); - }); - - println!("{table}"); - } - - RuntimeCommand::Status { - runtime_id, - runtime_name, - } => { - let runtime_id = match (runtime_id, runtime_name) { - (Some(id), _) => id, - (None, Some(name)) => get_runtime_by_name(&session, &name).await, - (None, None) => { - // This code is indeed unreachable because: - // (1) The `group` macro has `required = true` which indicates that clap requires an entry for - // any group. - // (2) The `group` macro has `multiple = false` which indicates that only a single entry for - // any group is accepted. - // (3) The `runtime_id` and `runtime_name` fields belong to the same group "runtime". - // - // => A single entry for the group "runtime" is required (and mandatory). - unreachable!() - } - }; - - let selector = selector_runtimes(&runtime_id); - - let value = serde_json::to_vec(&RuntimesQuery::Status).map_err(|e| { - tracing::error!( - "serde_json failed to serialize `RuntimeQuery::Status`: {:?}", - e - ); - anyhow!(ZENOH_FLOW_INTERNAL_ERROR) - })?; - - let reply = session - .get(selector) - .payload(value) - .timeout(Duration::from_secs(5)) - .await - .map_err(|e| { - anyhow!( - "Failed to query Zenoh-Flow runtime < {} >: {:?}", - runtime_id, - e - ) - })?; - - while let Ok(reply) = reply.recv_async().await { - match reply.result() { - Ok(sample) => { - match serde_json::from_slice::( - &sample.payload().to_bytes(), - ) { - Ok(runtime_status) => { - let mut table = Table::new(); - table.set_width(80); - table.add_row(row!("Identifier", runtime_id)); - table.add_row(row!("Name", runtime_status.name)); - table.add_row(row!( - "Host name", - runtime_status.hostname.unwrap_or_else(|| "N/A".into()) - )); - table.add_row(row!( - "Operating System", - runtime_status - .operating_system - .unwrap_or_else(|| "N/A".into()) - )); - table.add_row(row!( - "Arch", - runtime_status.architecture.unwrap_or_else(|| "N/A".into()) - )); - table.add_row(row!("# CPUs", runtime_status.cpus)); - table.add_row(row!( - "# RAM", - bytesize::to_string(runtime_status.ram_total, true) - )); - println!("{table}"); - - table = Table::new(); - table.set_width(80); - table.set_header(row!("Instance Uuid", "Name", "Status")); - runtime_status.data_flows_status.iter().for_each( - |(uuid, (name, status))| { - table.add_row(row!(uuid, name, status)); - }, - ); - println!("{table}"); - } - Err(e) => { - tracing::error!( - "Failed to parse reply as a `RuntimeStatus`: {:?}", - e - ) - } - } - } - - Err(e) => tracing::error!("Reply to runtime status failed with: {:?}", e), - } - } - } - } - - Ok(()) - } -}