Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(metactl): change to subcommands & support transfer-leader #16254

Merged
merged 30 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/meta/binaries/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ fastrace = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
rand = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
Expand Down
92 changes: 92 additions & 0 deletions src/meta/binaries/metactl/admin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use reqwest::Client;
use serde::Deserialize;

pub struct MetaAdminClient {
client: Client,
endpoint: String,
}

impl MetaAdminClient {
pub fn new(addr: &str) -> Self {
let client = Client::new();
MetaAdminClient {
client,
endpoint: format!("http://{}", addr),
}
}

pub async fn status(&self) -> anyhow::Result<AdminStatusResponse> {
let resp = self
.client
.get(format!("{}/v1/cluster/status", self.endpoint))
.send()
.await?;
let status = resp.status();
if status.is_success() {
let result = resp.json::<AdminStatusResponse>().await?;
Ok(result)
} else {
let data = resp.bytes().await?;
let msg = String::from_utf8_lossy(&data);
Err(anyhow::anyhow!("status code: {}, msg: {}", status, msg))
}
}

pub async fn transfer_leader(
&self,
target: Option<u64>,
) -> anyhow::Result<AdminTransferLeaderResponse> {
let resp = match target {
Some(to) => {
self.client
.get(format!(
"{}/v1/ctrl/trigger_transfer_leader?to={}",
self.endpoint, to
))
.send()
.await?
}
None => {
self.client
.get(format!("{}/v1/ctrl/trigger_transfer_leader", self.endpoint))
.send()
.await?
}
};
let status = resp.status();
if status.is_success() {
let result = resp.json::<AdminTransferLeaderResponse>().await?;
Ok(result)
} else {
let data = resp.bytes().await?;
let msg = String::from_utf8_lossy(&data);
Err(anyhow::anyhow!("status code: {}, msg: {}", status, msg))
}
}
}

#[derive(Deserialize, Debug)]
pub struct AdminStatusResponse {
pub name: String,
}

#[derive(Deserialize, Debug)]
pub struct AdminTransferLeaderResponse {
pub from: u64,
pub to: u64,
pub voter_ids: Vec<u64>,
}
15 changes: 7 additions & 8 deletions src/meta/binaries/metactl/export_from_disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,29 @@ use databend_meta::store::StoreInner;
use futures::TryStreamExt;

use crate::upgrade;
use crate::Config;
use crate::ExportArgs;

/// Print the entire sled db.
///
/// The output encodes every key-value into one line:
/// `[sled_tree_name, {key_space: {key, value}}]`
/// E.g.:
/// `["state_machine/0",{"GenericKV":{"key":"wow","value":{"seq":3,"meta":null,"data":[119,111,119]}}}`
pub async fn export_from_dir(config: &Config) -> anyhow::Result<()> {
upgrade::upgrade(config).await?;
pub async fn export_from_dir(args: &ExportArgs) -> anyhow::Result<()> {
let raft_config: RaftConfig = args.clone().into();
upgrade::upgrade(&raft_config).await?;

eprintln!();
eprintln!("Export:");

let raft_config: RaftConfig = config.clone().into();

let sto_inn = StoreInner::open_create(&raft_config, Some(()), None).await?;
let mut lines = Arc::new(sto_inn).export();

eprintln!(" From: {}", raft_config.raft_dir);

let file: Option<File> = if !config.db.is_empty() {
eprintln!(" To: File: {}", config.db);
Some((File::create(&config.db))?)
let file: Option<File> = if !args.db.is_empty() {
eprintln!(" To: File: {}", args.db);
Some((File::create(&args.db))?)
} else {
eprintln!(" To: <stdout>");
None
Expand Down
23 changes: 9 additions & 14 deletions src/meta/binaries/metactl/export_from_grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,19 @@ use databend_common_meta_types::protobuf;
use tokio::net::TcpSocket;
use tokio_stream::StreamExt;

use crate::Config;
use crate::ExportArgs;

/// Dump metasrv data, raft-log, state machine etc in json to stdout.
pub async fn export_from_running_node(config: &Config) -> Result<(), anyhow::Error> {
pub async fn export_from_running_node(args: &ExportArgs) -> Result<(), anyhow::Error> {
eprintln!();
eprintln!("Export:");
eprintln!(" From: online meta-service: {}", config.grpc_api_address);
eprintln!(" Export To: {}", config.db);
eprintln!(" Export Chunk Size: {:?}", config.export_chunk_size);

let grpc_api_addr = get_available_socket_addr(&config.grpc_api_address).await?;

export_from_grpc(
grpc_api_addr.to_string().as_str(),
config.db.clone(),
config.export_chunk_size,
)
.await?;
eprintln!(" From: online meta-service: {}", args.grpc_api_address);
eprintln!(" Export To: {}", args.db);
eprintln!(" Export Chunk Size: {:?}", args.chunk_size);

let grpc_api_addr = get_available_socket_addr(args.grpc_api_address.as_str()).await?;
let addr = grpc_api_addr.to_string();
export_from_grpc(addr.as_str(), args.db.clone(), args.chunk_size).await?;
Ok(())
}

Expand Down
59 changes: 29 additions & 30 deletions src/meta/binaries/metactl/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,38 +55,38 @@ use url::Url;

use crate::reading;
use crate::upgrade;
use crate::Config;
use crate::ImportArgs;

pub async fn import_data(config: &Config) -> anyhow::Result<()> {
let raft_dir = config.raft_dir.clone().unwrap_or_default();
pub async fn import_data(args: &ImportArgs) -> anyhow::Result<()> {
let raft_dir = args.raft_dir.clone().unwrap_or_default();

eprintln!();
eprintln!("Import:");
eprintln!(" Into Meta Dir: '{}'", raft_dir);
eprintln!(" Initialize Cluster with Id: {}, cluster: {{", config.id);
for peer in config.initial_cluster.clone() {
eprintln!(" Initialize Cluster with Id: {}, cluster: {{", args.id);
for peer in args.initial_cluster.clone() {
eprintln!(" Peer: {}", peer);
}
eprintln!(" }}");

let nodes = build_nodes(config.initial_cluster.clone(), config.id)?;
let nodes = build_nodes(args.initial_cluster.clone(), args.id)?;

init_sled_db(raft_dir.clone(), 64 * 1024 * 1024 * 1024);

clear(config)?;
let max_log_id = import_from_stdin_or_file(config).await?;
clear(args)?;
let max_log_id = import_from_stdin_or_file(args).await?;

if config.initial_cluster.is_empty() {
if args.initial_cluster.is_empty() {
return Ok(());
}

init_new_cluster(config, nodes, max_log_id, config.id).await?;
init_new_cluster(args, nodes, max_log_id).await?;
Ok(())
}

/// Import from lines of exported data and Return the max log id that is found.
async fn import_lines<B: BufRead + 'static>(
config: &Config,
raft_config: RaftConfig,
lines: Lines<B>,
) -> anyhow::Result<Option<LogId>> {
#[allow(clippy::useless_conversion)]
Expand All @@ -106,8 +106,8 @@ async fn import_lines<B: BufRead + 'static>(
please use an older version databend-metactl to import from V001"
));
}
DataVersion::V002 => import_v002(config, it).await?,
DataVersion::V003 => import_v003(config, it).await?,
DataVersion::V002 => import_v002(raft_config, it).await?,
DataVersion::V003 => import_v003(raft_config, it).await?,
};

Ok(max_log_id)
Expand All @@ -119,11 +119,11 @@ async fn import_lines<B: BufRead + 'static>(
///
/// It write logs and related entries to sled trees, and state_machine entries to a snapshot.
async fn import_v002(
config: &Config,
raft_config: RaftConfig,
lines: impl IntoIterator<Item = Result<String, io::Error>>,
) -> anyhow::Result<Option<LogId>> {
// v002 and v003 share the same exported data format.
import_v003(config, lines).await
import_v003(raft_config, lines).await
}

/// Import serialized lines for `DataVersion::V003`
Expand All @@ -132,11 +132,9 @@ async fn import_v002(
///
/// It write logs and related entries to sled trees, and state_machine entries to a snapshot.
async fn import_v003(
config: &Config,
raft_config: RaftConfig,
lines: impl IntoIterator<Item = Result<String, io::Error>>,
) -> anyhow::Result<Option<LogId>> {
let raft_config: RaftConfig = config.clone().into();

let db = get_sled_db();

let mut n = 0;
Expand Down Expand Up @@ -221,24 +219,26 @@ async fn import_v003(
/// Insert them into sled db and flush.
///
/// Finally upgrade the data in raft_dir to the latest version.
async fn import_from_stdin_or_file(config: &Config) -> anyhow::Result<Option<LogId>> {
let restore = config.db.clone();
async fn import_from_stdin_or_file(args: &ImportArgs) -> anyhow::Result<Option<LogId>> {
let restore = args.db.clone();

let raft_config: RaftConfig = args.clone().into();
let max_log_id = if restore.is_empty() {
eprintln!(" From: <stdin>");
let lines = io::stdin().lines();

import_lines(config, lines).await?
import_lines(raft_config, lines).await?
} else {
eprintln!(" From: {}", config.db);
eprintln!(" From: {}", args.db);
let file = File::open(restore)?;
let reader = BufReader::new(file);
let lines = reader.lines();

import_lines(config, lines).await?
import_lines(raft_config, lines).await?
};

upgrade::upgrade(config).await?;
let raft_config: RaftConfig = args.clone().into();
upgrade::upgrade(&raft_config).await?;

Ok(max_log_id)
}
Expand Down Expand Up @@ -298,16 +298,15 @@ fn build_nodes(initial_cluster: Vec<String>, id: u64) -> anyhow::Result<BTreeMap

// initial_cluster format: node_id=endpoint,grpc_api_addr;
async fn init_new_cluster(
config: &Config,
args: &ImportArgs,
nodes: BTreeMap<NodeId, Node>,
max_log_id: Option<LogId>,
id: u64,
) -> anyhow::Result<()> {
eprintln!();
eprintln!("Initialize Cluster with: {:?}", nodes);

let db = get_sled_db();
let raft_config: RaftConfig = config.clone().into();
let raft_config: RaftConfig = args.clone().into();

let mut sto = RaftStore::open_create(&raft_config, Some(()), None).await?;

Expand Down Expand Up @@ -375,13 +374,13 @@ async fn init_new_cluster(

// Reset node id
let raft_state = RaftState::open_create(&db, &raft_config, Some(()), None).await?;
raft_state.set_node_id(id).await?;
raft_state.set_node_id(args.id).await?;

Ok(())
}

/// Clear all sled data and on-disk snapshot.
fn clear(config: &Config) -> anyhow::Result<()> {
fn clear(args: &ImportArgs) -> anyhow::Result<()> {
eprintln!();
eprintln!("Clear All Sled Trees Before Import:");
let db = get_sled_db();
Expand All @@ -394,7 +393,7 @@ fn clear(config: &Config) -> anyhow::Result<()> {
eprintln!(" Cleared sled tree: {}", name);
}

let df_meta_path = format!("{}/df_meta", config.raft_dir.clone().unwrap_or_default());
let df_meta_path = format!("{}/df_meta", args.raft_dir.clone().unwrap_or_default());
if Path::new(&df_meta_path).exists() {
remove_dir_all(&df_meta_path)?;
}
Expand Down
Loading
Loading