diff --git a/Cargo.lock b/Cargo.lock index 4315d8733b5b..f9eb63f3606d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5104,6 +5104,7 @@ dependencies = [ "futures", "log", "rand 0.8.5", + "reqwest 0.12.4", "serde", "serde_json", "tokio", diff --git a/src/meta/binaries/Cargo.toml b/src/meta/binaries/Cargo.toml index b078d32c291f..d201d2032092 100644 --- a/src/meta/binaries/Cargo.toml +++ b/src/meta/binaries/Cargo.toml @@ -44,6 +44,7 @@ fastrace = { workspace = true } futures = { workspace = true } log = { workspace = true } rand = { workspace = true } +reqwest = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } diff --git a/src/meta/binaries/metactl/admin.rs b/src/meta/binaries/metactl/admin.rs new file mode 100644 index 000000000000..b5c53cb4c4ee --- /dev/null +++ b/src/meta/binaries/metactl/admin.rs @@ -0,0 +1,92 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use reqwest::Client; +use serde::Deserialize; + +pub struct MetaAdminClient { + client: Client, + endpoint: String, +} + +impl MetaAdminClient { + pub fn new(addr: &str) -> Self { + let client = Client::new(); + MetaAdminClient { + client, + endpoint: format!("http://{}", addr), + } + } + + pub async fn status(&self) -> anyhow::Result { + let resp = self + .client + .get(format!("{}/v1/cluster/status", self.endpoint)) + .send() + .await?; + let status = resp.status(); + if status.is_success() { + let result = resp.json::().await?; + Ok(result) + } else { + let data = resp.bytes().await?; + let msg = String::from_utf8_lossy(&data); + Err(anyhow::anyhow!("status code: {}, msg: {}", status, msg)) + } + } + + pub async fn transfer_leader( + &self, + target: Option, + ) -> anyhow::Result { + let resp = match target { + Some(to) => { + self.client + .get(format!( + "{}/v1/ctrl/trigger_transfer_leader?to={}", + self.endpoint, to + )) + .send() + .await? + } + None => { + self.client + .get(format!("{}/v1/ctrl/trigger_transfer_leader", self.endpoint)) + .send() + .await? + } + }; + let status = resp.status(); + if status.is_success() { + let result = resp.json::().await?; + Ok(result) + } else { + let data = resp.bytes().await?; + let msg = String::from_utf8_lossy(&data); + Err(anyhow::anyhow!("status code: {}, msg: {}", status, msg)) + } + } +} + +#[derive(Deserialize, Debug)] +pub struct AdminStatusResponse { + pub name: String, +} + +#[derive(Deserialize, Debug)] +pub struct AdminTransferLeaderResponse { + pub from: u64, + pub to: u64, + pub voter_ids: Vec, +} diff --git a/src/meta/binaries/metactl/export_from_disk.rs b/src/meta/binaries/metactl/export_from_disk.rs index cfb954f798f6..1215e3a78a17 100644 --- a/src/meta/binaries/metactl/export_from_disk.rs +++ b/src/meta/binaries/metactl/export_from_disk.rs @@ -21,7 +21,7 @@ use databend_meta::store::StoreInner; use futures::TryStreamExt; use crate::upgrade; -use crate::Config; +use crate::ExportArgs; /// Print the entire sled db. /// @@ -29,22 +29,21 @@ use crate::Config; /// `[sled_tree_name, {key_space: {key, value}}]` /// E.g.: /// `["state_machine/0",{"GenericKV":{"key":"wow","value":{"seq":3,"meta":null,"data":[119,111,119]}}}` -pub async fn export_from_dir(config: &Config) -> anyhow::Result<()> { - upgrade::upgrade(config).await?; +pub async fn export_from_dir(args: &ExportArgs) -> anyhow::Result<()> { + let raft_config: RaftConfig = args.clone().into(); + upgrade::upgrade(&raft_config).await?; eprintln!(); eprintln!("Export:"); - let raft_config: RaftConfig = config.clone().into(); - let sto_inn = StoreInner::open_create(&raft_config, Some(()), None).await?; let mut lines = Arc::new(sto_inn).export(); eprintln!(" From: {}", raft_config.raft_dir); - let file: Option = if !config.db.is_empty() { - eprintln!(" To: File: {}", config.db); - Some((File::create(&config.db))?) + let file: Option = if !args.db.is_empty() { + eprintln!(" To: File: {}", args.db); + Some((File::create(&args.db))?) } else { eprintln!(" To: "); None diff --git a/src/meta/binaries/metactl/export_from_grpc.rs b/src/meta/binaries/metactl/export_from_grpc.rs index 82c2b7d3fa60..8c497cc0ed3f 100644 --- a/src/meta/binaries/metactl/export_from_grpc.rs +++ b/src/meta/binaries/metactl/export_from_grpc.rs @@ -25,24 +25,19 @@ use databend_common_meta_types::protobuf; use tokio::net::TcpSocket; use tokio_stream::StreamExt; -use crate::Config; +use crate::ExportArgs; /// Dump metasrv data, raft-log, state machine etc in json to stdout. -pub async fn export_from_running_node(config: &Config) -> Result<(), anyhow::Error> { +pub async fn export_from_running_node(args: &ExportArgs) -> Result<(), anyhow::Error> { eprintln!(); eprintln!("Export:"); - eprintln!(" From: online meta-service: {}", config.grpc_api_address); - eprintln!(" Export To: {}", config.db); - eprintln!(" Export Chunk Size: {:?}", config.export_chunk_size); - - let grpc_api_addr = get_available_socket_addr(&config.grpc_api_address).await?; - - export_from_grpc( - grpc_api_addr.to_string().as_str(), - config.db.clone(), - config.export_chunk_size, - ) - .await?; + eprintln!(" From: online meta-service: {}", args.grpc_api_address); + eprintln!(" Export To: {}", args.db); + eprintln!(" Export Chunk Size: {:?}", args.chunk_size); + + let grpc_api_addr = get_available_socket_addr(args.grpc_api_address.as_str()).await?; + let addr = grpc_api_addr.to_string(); + export_from_grpc(addr.as_str(), args.db.clone(), args.chunk_size).await?; Ok(()) } diff --git a/src/meta/binaries/metactl/import.rs b/src/meta/binaries/metactl/import.rs index 9297c3568847..6f3178ee73de 100644 --- a/src/meta/binaries/metactl/import.rs +++ b/src/meta/binaries/metactl/import.rs @@ -55,38 +55,38 @@ use url::Url; use crate::reading; use crate::upgrade; -use crate::Config; +use crate::ImportArgs; -pub async fn import_data(config: &Config) -> anyhow::Result<()> { - let raft_dir = config.raft_dir.clone().unwrap_or_default(); +pub async fn import_data(args: &ImportArgs) -> anyhow::Result<()> { + let raft_dir = args.raft_dir.clone().unwrap_or_default(); eprintln!(); eprintln!("Import:"); eprintln!(" Into Meta Dir: '{}'", raft_dir); - eprintln!(" Initialize Cluster with Id: {}, cluster: {{", config.id); - for peer in config.initial_cluster.clone() { + eprintln!(" Initialize Cluster with Id: {}, cluster: {{", args.id); + for peer in args.initial_cluster.clone() { eprintln!(" Peer: {}", peer); } eprintln!(" }}"); - let nodes = build_nodes(config.initial_cluster.clone(), config.id)?; + let nodes = build_nodes(args.initial_cluster.clone(), args.id)?; init_sled_db(raft_dir.clone(), 64 * 1024 * 1024 * 1024); - clear(config)?; - let max_log_id = import_from_stdin_or_file(config).await?; + clear(args)?; + let max_log_id = import_from_stdin_or_file(args).await?; - if config.initial_cluster.is_empty() { + if args.initial_cluster.is_empty() { return Ok(()); } - init_new_cluster(config, nodes, max_log_id, config.id).await?; + init_new_cluster(args, nodes, max_log_id).await?; Ok(()) } /// Import from lines of exported data and Return the max log id that is found. async fn import_lines( - config: &Config, + raft_config: RaftConfig, lines: Lines, ) -> anyhow::Result> { #[allow(clippy::useless_conversion)] @@ -106,8 +106,8 @@ async fn import_lines( please use an older version databend-metactl to import from V001" )); } - DataVersion::V002 => import_v002(config, it).await?, - DataVersion::V003 => import_v003(config, it).await?, + DataVersion::V002 => import_v002(raft_config, it).await?, + DataVersion::V003 => import_v003(raft_config, it).await?, }; Ok(max_log_id) @@ -119,11 +119,11 @@ async fn import_lines( /// /// It write logs and related entries to sled trees, and state_machine entries to a snapshot. async fn import_v002( - config: &Config, + raft_config: RaftConfig, lines: impl IntoIterator>, ) -> anyhow::Result> { // v002 and v003 share the same exported data format. - import_v003(config, lines).await + import_v003(raft_config, lines).await } /// Import serialized lines for `DataVersion::V003` @@ -132,11 +132,9 @@ async fn import_v002( /// /// It write logs and related entries to sled trees, and state_machine entries to a snapshot. async fn import_v003( - config: &Config, + raft_config: RaftConfig, lines: impl IntoIterator>, ) -> anyhow::Result> { - let raft_config: RaftConfig = config.clone().into(); - let db = get_sled_db(); let mut n = 0; @@ -221,24 +219,26 @@ async fn import_v003( /// Insert them into sled db and flush. /// /// Finally upgrade the data in raft_dir to the latest version. -async fn import_from_stdin_or_file(config: &Config) -> anyhow::Result> { - let restore = config.db.clone(); +async fn import_from_stdin_or_file(args: &ImportArgs) -> anyhow::Result> { + let restore = args.db.clone(); + let raft_config: RaftConfig = args.clone().into(); let max_log_id = if restore.is_empty() { eprintln!(" From: "); let lines = io::stdin().lines(); - import_lines(config, lines).await? + import_lines(raft_config, lines).await? } else { - eprintln!(" From: {}", config.db); + eprintln!(" From: {}", args.db); let file = File::open(restore)?; let reader = BufReader::new(file); let lines = reader.lines(); - import_lines(config, lines).await? + import_lines(raft_config, lines).await? }; - upgrade::upgrade(config).await?; + let raft_config: RaftConfig = args.clone().into(); + upgrade::upgrade(&raft_config).await?; Ok(max_log_id) } @@ -298,16 +298,15 @@ fn build_nodes(initial_cluster: Vec, id: u64) -> anyhow::Result, max_log_id: Option, - id: u64, ) -> anyhow::Result<()> { eprintln!(); eprintln!("Initialize Cluster with: {:?}", nodes); let db = get_sled_db(); - let raft_config: RaftConfig = config.clone().into(); + let raft_config: RaftConfig = args.clone().into(); let mut sto = RaftStore::open_create(&raft_config, Some(()), None).await?; @@ -375,13 +374,13 @@ async fn init_new_cluster( // Reset node id let raft_state = RaftState::open_create(&db, &raft_config, Some(()), None).await?; - raft_state.set_node_id(id).await?; + raft_state.set_node_id(args.id).await?; Ok(()) } /// Clear all sled data and on-disk snapshot. -fn clear(config: &Config) -> anyhow::Result<()> { +fn clear(args: &ImportArgs) -> anyhow::Result<()> { eprintln!(); eprintln!("Clear All Sled Trees Before Import:"); let db = get_sled_db(); @@ -394,7 +393,7 @@ fn clear(config: &Config) -> anyhow::Result<()> { eprintln!(" Cleared sled tree: {}", name); } - let df_meta_path = format!("{}/df_meta", config.raft_dir.clone().unwrap_or_default()); + let df_meta_path = format!("{}/df_meta", args.raft_dir.clone().unwrap_or_default()); if Path::new(&df_meta_path).exists() { remove_dir_all(&df_meta_path)?; } diff --git a/src/meta/binaries/metactl/main.rs b/src/meta/binaries/metactl/main.rs index dce759632d3d..8900ce45ecda 100644 --- a/src/meta/binaries/metactl/main.rs +++ b/src/meta/binaries/metactl/main.rs @@ -16,6 +16,7 @@ mod export_from_grpc; +pub mod admin; pub mod export_from_disk; pub mod import; pub(crate) mod reading; @@ -23,42 +24,52 @@ pub mod upgrade; use std::collections::BTreeMap; +use admin::MetaAdminClient; +use clap::Args; +use clap::CommandFactory; use clap::Parser; +use clap::Subcommand; use databend_common_base::base::tokio; use databend_common_meta_client::MetaGrpcClient; use databend_common_meta_kvapi::kvapi::KVApi; use databend_common_meta_raft_store::config::RaftConfig; -use databend_common_meta_raft_store::ondisk::DATA_VERSION; use databend_common_meta_sled_store::init_sled_db; use databend_common_tracing::init_logging; use databend_common_tracing::Config as LogConfig; use databend_common_tracing::FileConfig; use databend_meta::version::METASRV_COMMIT_VERSION; use serde::Deserialize; -use serde::Serialize; - -// TODO(xuanwo) -// -// We should make metactl config keeps backward compatibility too. -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Parser)] -#[clap(about, version = &**METASRV_COMMIT_VERSION, author)] -pub struct Config { - /// Run a command - #[clap(long, default_value = "")] - pub cmd: String, +#[derive(Debug, Clone, Deserialize, Args)] +pub struct GlobalArgs { #[clap(long, default_value = "INFO")] pub log_level: String, - #[clap(long)] - pub status: bool, + /// DEPRECATE: use subcommand instead. + #[clap( + long, + env = "METASRV_GRPC_API_ADDRESS", + default_value = "127.0.0.1:9191" + )] + pub grpc_api_address: String, + /// DEPRECATE: use subcommand instead. #[clap(long)] pub import: bool, + /// DEPRECATE: use subcommand instead. #[clap(long)] pub export: bool, + /// DEPRECATE: use subcommand instead. + /// + /// The dir to store persisted meta state, including raft logs, state machine etc. + #[clap(long)] + #[serde(alias = "kvsrv_raft_dir")] + pub raft_dir: Option, + + /// DEPRECATE: use subcommand instead. + /// /// The N.O. json strings in a export stream item. /// /// Set this to a smaller value if you get gRPC message body too large error. @@ -68,11 +79,43 @@ pub struct Config { #[clap(long)] pub export_chunk_size: Option, - #[clap( - long, - env = "METASRV_GRPC_API_ADDRESS", - default_value = "127.0.0.1:9191" - )] + /// DEPRECATE: use subcommand instead. + /// + /// When export raft data, this is the name of the save db file. + /// If `db` is empty, output the exported data as json to stdout instead. + /// When import raft data, this is the name of the restored db file. + /// If `db` is empty, the restored data is from stdin instead. + #[clap(long, default_value = "")] + pub db: String, + + /// DEPRECATE: use subcommand instead. + /// + /// initial_cluster format: node_id=endpoint,grpc_api_addr + #[clap(long)] + pub initial_cluster: Vec, + + /// DEPRECATE: use subcommand instead. + /// + /// The node id. Used in these cases: + /// + /// 1. when this server is not initialized, e.g. --boot or --single for the first time. + /// 2. --initial_cluster with new cluster node id. + /// + /// Otherwise this argument is ignored. + #[clap(long, default_value = "0")] + #[serde(alias = "kvsrv_id")] + pub id: u64, +} + +#[derive(Debug, Clone, Deserialize, Args)] +pub struct StatusArgs { + #[clap(long, default_value = "127.0.0.1:9191")] + pub grpc_api_address: String, +} + +#[derive(Debug, Clone, Deserialize, Args)] +pub struct ExportArgs { + #[clap(long, default_value = "127.0.0.1:9191")] pub grpc_api_address: String, /// The dir to store persisted meta state, including raft logs, state machine etc. @@ -80,9 +123,50 @@ pub struct Config { #[serde(alias = "kvsrv_raft_dir")] pub raft_dir: Option, - /// When export raft data, this is the name of the save db file. + /// The N.O. json strings in a export stream item. + /// + /// Set this to a smaller value if you get gRPC message body too large error. + /// This requires meta-service >= 1.2.315; For older version, this argument is ignored. + /// + /// By default it is 32. + #[clap(long)] + pub chunk_size: Option, + + /// The name of the save db file. /// If `db` is empty, output the exported data as json to stdout instead. - /// When import raft data, this is the name of the restored db file. + #[clap(long, default_value = "")] + pub db: String, + + /// The node id. Used in these cases: + /// + /// 1. when this server is not initialized, e.g. --boot or --single for the first time. + /// 2. --initial_cluster with new cluster node id. + /// + /// Otherwise this argument is ignored. + #[clap(long, default_value = "0")] + #[serde(alias = "kvsrv_id")] + pub id: u64, +} + +impl From for RaftConfig { + #[allow(clippy::field_reassign_with_default)] + fn from(value: ExportArgs) -> Self { + let mut c = Self::default(); + + c.raft_dir = value.raft_dir.unwrap_or_default(); + c.id = value.id; + c + } +} + +#[derive(Debug, Clone, Deserialize, Args)] +pub struct ImportArgs { + /// The dir to store persisted meta state, including raft logs, state machine etc. + #[clap(long)] + #[serde(alias = "kvsrv_raft_dir")] + pub raft_dir: Option, + + /// The name of the restored db file. /// If `db` is empty, the restored data is from stdin instead. #[clap(long, default_value = "")] pub db: String, @@ -102,9 +186,9 @@ pub struct Config { pub id: u64, } -impl From for RaftConfig { +impl From for RaftConfig { #[allow(clippy::field_reassign_with_default)] - fn from(value: Config) -> Self { + fn from(value: ImportArgs) -> Self { let mut c = Self::default(); c.raft_dir = value.raft_dir.unwrap_or_default(); @@ -113,6 +197,144 @@ impl From for RaftConfig { } } +#[derive(Debug, Clone, Deserialize, Args)] +pub struct TransferLeaderArgs { + #[clap(long)] + pub to: Option, + + #[clap(long, default_value = "127.0.0.1:28002")] + pub admin_api_address: String, +} + +#[derive(Debug, Clone, Deserialize, Args)] +pub struct BenchArgs { + #[clap(long, default_value = "127.0.0.1:9191")] + pub grpc_api_address: String, +} + +#[derive(Debug, Deserialize, Parser)] +#[clap(name = "databend-metactl", about, version = &**METASRV_COMMIT_VERSION, author)] +struct App { + #[clap(subcommand)] + command: Option, + + #[clap(flatten)] + globals: GlobalArgs, +} + +impl App { + fn print_help(&self) -> anyhow::Result<()> { + let mut cmd = Self::command(); + cmd.print_help()?; + Ok(()) + } + + async fn show_status(&self, args: &StatusArgs) -> anyhow::Result<()> { + let addr = args.grpc_api_address.clone(); + let client = MetaGrpcClient::try_create(vec![addr], "root", "xxx", None, None, None)?; + + let res = client.get_cluster_status().await?; + println!("BinaryVersion: {}", res.binary_version); + println!("DataVersion: {}", res.data_version); + println!("DBSize: {}", res.db_size); + println!("KeyNumber: {}", res.key_num); + println!("Node: id={} raft={}", res.id, res.endpoint); + println!("State: {}", res.state); + if let Some(leader) = res.leader { + println!("Leader: {}", leader); + } + println!("CurrentTerm: {}", res.current_term); + println!("LastSeq: {:?}", res.last_seq); + println!("LastLogIndex: {}", res.last_log_index); + println!("LastApplied: {}", res.last_applied); + if let Some(last_log_id) = res.snapshot_last_log_id { + println!("SnapshotLastLogID: {}", last_log_id); + } + if let Some(purged) = res.purged { + println!("Purged: {}", purged); + } + if !res.replication.is_empty() { + println!("Replication:"); + for (k, v) in res.replication { + if v != res.last_applied { + println!(" - [{}] {} *", k, v); + } else { + println!(" - [{}] {}", k, v); + } + } + } + if !res.voters.is_empty() { + println!("Voters:"); + for v in res.voters { + println!(" - {}", v); + } + } + if !res.non_voters.is_empty() { + println!("NonVoters:"); + for v in res.non_voters { + println!(" - {}", v); + } + } + Ok(()) + } + + async fn bench_client_num_conn(&self, args: &BenchArgs) -> anyhow::Result<()> { + let addr = args.grpc_api_address.clone(); + println!( + "loop: connect to metasrv {}, get_kv('foo'), do not drop the connection", + addr + ); + let mut clients = vec![]; + let mut i = 0; + loop { + i += 1; + let client = + MetaGrpcClient::try_create(vec![addr.clone()], "root", "xxx", None, None, None)?; + let res = client.get_kv("foo").await; + println!("{}-th: get_kv(foo): {:?}", i, res); + clients.push(client); + } + } + + async fn transfer_leader(&self, args: &TransferLeaderArgs) -> anyhow::Result<()> { + let client = MetaAdminClient::new(args.admin_api_address.as_str()); + let result = client.transfer_leader(args.to).await?; + println!( + "triggered leader transfer from {} to {}.", + result.from, result.to + ); + println!("voter ids: {:?}", result.voter_ids); + Ok(()) + } + + async fn export(&self, args: &ExportArgs) -> anyhow::Result<()> { + match args.raft_dir { + None => { + export_from_grpc::export_from_running_node(args).await?; + } + Some(ref dir) => { + init_sled_db(dir.clone(), 64 * 1024 * 1024 * 1024); + export_from_disk::export_from_dir(args).await?; + } + } + Ok(()) + } + + async fn import(&self, args: &ImportArgs) -> anyhow::Result<()> { + import::import_data(args).await?; + Ok(()) + } +} + +#[derive(Debug, Clone, Deserialize, Subcommand)] +enum CtlCommand { + Status(StatusArgs), + Export(ExportArgs), + Import(ImportArgs), + TransferLeader(TransferLeaderArgs), + BenchClientNumConn(BenchArgs), +} + /// Usage: /// - To dump a sled db: `$0 --raft-dir ./_your_meta_dir/`: /// ``` @@ -124,12 +346,12 @@ impl From for RaftConfig { /// ``` #[tokio::main] async fn main() -> anyhow::Result<()> { - let config = Config::parse(); + let app = App::parse(); let log_config = LogConfig { file: FileConfig { on: true, - level: config.log_level.clone(), + level: app.globals.log_level.clone(), dir: ".databend/logs".to_string(), format: "text".to_string(), limit: 48, @@ -137,136 +359,50 @@ async fn main() -> anyhow::Result<()> { }, ..Default::default() }; - let _guards = init_logging("metactl", &log_config, BTreeMap::new()); - if config.status { - return show_status(&config).await; - } - - eprintln!(); - eprintln!("╔╦╗╔═╗╔╦╗╔═╗ ╔═╗╔╦╗╦ "); - eprintln!("║║║║╣ ║ ╠═╣───║ ║ ║ "); - eprintln!("╩ ╩╚═╝ ╩ ╩ ╩ ╚═╝ ╩ ╩═╝ Databend"); - eprintln!(); - eprintln!("Version: {}", METASRV_COMMIT_VERSION.as_str()); - eprintln!("Working DataVersion: {:?}", DATA_VERSION); - eprintln!(); - eprintln!("Id: {}", config.id); - eprintln!("Log:"); - eprintln!(" File: {}", log_config.file); - eprintln!(" Stderr: {}", log_config.stderr); - - if !config.cmd.is_empty() { - return match config.cmd.as_str() { - "bench-client-conn-num" => { - bench_client_num_conn(&config).await?; - Ok(()) + match app.command { + Some(ref cmd) => match cmd { + CtlCommand::Status(args) => { + app.show_status(args).await?; } - - _ => { - eprintln!("valid commands are"); - eprintln!(" --cmd bench-client-conn-num"); - eprintln!(" Keep create new connections to metasrv."); - eprintln!(" Requires --grpc-api-address."); - - Err(anyhow::anyhow!("unknown cmd: {}", config.cmd)) + CtlCommand::BenchClientNumConn(args) => { + app.bench_client_num_conn(args).await?; } - }; - } - - if config.export { - return export_data(&config).await; - } - - if config.import { - return import::import_data(&config).await; - } - - Err(anyhow::anyhow!("Nothing to do")) -} - -async fn bench_client_num_conn(conf: &Config) -> anyhow::Result<()> { - let addr = &conf.grpc_api_address; - - println!( - "loop: connect to metasrv {}, get_kv('foo'), do not drop the connection", - addr - ); - - let mut clients = vec![]; - let mut i = 0; - - loop { - i += 1; - let client = - MetaGrpcClient::try_create(vec![addr.to_string()], "root", "xxx", None, None, None)?; - - let res = client.get_kv("foo").await; - println!("{}-th: get_kv(foo): {:?}", i, res); - - clients.push(client); - } -} - -async fn show_status(conf: &Config) -> anyhow::Result<()> { - let addr = &conf.grpc_api_address; - - let client = - MetaGrpcClient::try_create(vec![addr.to_string()], "root", "xxx", None, None, None)?; - - let res = client.get_cluster_status().await?; - println!("BinaryVersion: {}", res.binary_version); - println!("DataVersion: {}", res.data_version); - println!("DBSize: {}", res.db_size); - println!("KeyNumber: {}", res.key_num); - println!("Node: id={} raft={}", res.id, res.endpoint); - println!("State: {}", res.state); - if let Some(leader) = res.leader { - println!("Leader: {}", leader); - } - println!("CurrentTerm: {}", res.current_term); - println!("LastSeq: {:?}", res.last_seq); - println!("LastLogIndex: {}", res.last_log_index); - println!("LastApplied: {}", res.last_applied); - if let Some(last_log_id) = res.snapshot_last_log_id { - println!("SnapshotLastLogID: {}", last_log_id); - } - if let Some(purged) = res.purged { - println!("Purged: {}", purged); - } - if !res.replication.is_empty() { - println!("Replication:"); - for (k, v) in res.replication { - if v != res.last_applied { - println!(" - [{}] {} *", k, v); + CtlCommand::TransferLeader(args) => { + app.transfer_leader(args).await?; + } + CtlCommand::Export(args) => { + app.export(args).await?; + } + CtlCommand::Import(args) => { + app.import(args).await?; + } + }, + // for backward compatibility + None => { + if app.globals.export { + let args = ExportArgs { + grpc_api_address: app.globals.grpc_api_address.clone(), + raft_dir: app.globals.raft_dir.clone(), + db: app.globals.db.clone(), + id: app.globals.id, + chunk_size: app.globals.export_chunk_size, + }; + app.export(&args).await?; + } else if app.globals.import { + let args = ImportArgs { + raft_dir: app.globals.raft_dir.clone(), + db: app.globals.db.clone(), + id: app.globals.id, + initial_cluster: app.globals.initial_cluster.clone(), + }; + app.import(&args).await?; } else { - println!(" - [{}] {}", k, v); + app.print_help()?; } } } - if !res.voters.is_empty() { - println!("Voters:"); - for v in res.voters { - println!(" - {}", v); - } - } - if !res.non_voters.is_empty() { - println!("NonVoters:"); - for v in res.non_voters { - println!(" - {}", v); - } - } - Ok(()) -} -pub async fn export_data(config: &Config) -> anyhow::Result<()> { - match config.raft_dir { - None => export_from_grpc::export_from_running_node(config).await?, - Some(ref dir) => { - init_sled_db(dir.clone(), 64 * 1024 * 1024 * 1024); - export_from_disk::export_from_dir(config).await?; - } - } Ok(()) } diff --git a/src/meta/binaries/metactl/upgrade.rs b/src/meta/binaries/metactl/upgrade.rs index adca1faae505..18678b9a9c48 100644 --- a/src/meta/binaries/metactl/upgrade.rs +++ b/src/meta/binaries/metactl/upgrade.rs @@ -16,15 +16,11 @@ use databend_common_meta_raft_store::config::RaftConfig; use databend_common_meta_raft_store::ondisk::OnDisk; use databend_common_meta_sled_store::get_sled_db; -use crate::Config; - /// Upgrade the data in raft_dir to the latest version. -pub async fn upgrade(config: &Config) -> anyhow::Result<()> { - let raft_config: RaftConfig = config.clone().into(); - +pub async fn upgrade(raft_config: &RaftConfig) -> anyhow::Result<()> { let db = get_sled_db(); - let mut on_disk = OnDisk::open(&db, &raft_config).await?; + let mut on_disk = OnDisk::open(&db, raft_config).await?; on_disk.log_stderr(true); on_disk.upgrade().await?; diff --git a/tests/suites/1_stateful/09_http_handler/09_0007_token.py b/tests/suites/1_stateful/09_http_handler/09_0007_token.py index 5b7ccab59529..7921d4f2505b 100755 --- a/tests/suites/1_stateful/09_http_handler/09_0007_token.py +++ b/tests/suites/1_stateful/09_http_handler/09_0007_token.py @@ -43,7 +43,10 @@ def do_renew(_case_id, refresh_token, session_token): payload = {"session_token": session_token} response = requests.post( renew_url, - headers={"Content-Type": "application/json", "Authorization": f"Bearer {refresh_token}"}, + headers={ + "Content-Type": "application/json", + "Authorization": f"Bearer {refresh_token}", + }, json=payload, ) return response.json() @@ -54,15 +57,26 @@ def do_query(query, session_token): query_payload = {"sql": query, "pagination": {"wait_time_secs": 11}} response = requests.post( query_url, - headers={"Content-Type": "application/json", "Authorization": f"Bearer {session_token}"}, + headers={ + "Content-Type": "application/json", + "Authorization": f"Bearer {session_token}", + }, json=query_payload, ) return response.json() def fake_expired_token(): - expired_claim = {"exp": int(time.time()) - 10, "tenant": "", "user": "", "nonce": "", "sid": ""} - return "bend-v1-" + base64.b64encode(json.dumps(expired_claim).encode("utf-8")).decode("utf-8") + expired_claim = { + "exp": int(time.time()) - 10, + "tenant": "", + "user": "", + "nonce": "", + "sid": "", + } + return "bend-v1-" + base64.b64encode( + json.dumps(expired_claim).encode("utf-8") + ).decode("utf-8") def main():