From d362bade300fdc07cbf225e4bb6b467391246c3e Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Tue, 18 Jul 2023 17:56:58 +0800 Subject: [PATCH 1/5] feat: add status field when heartbeat to meta --- Cargo.lock | 4 ++-- cluster/src/cluster_impl.rs | 2 +- cluster/src/shard_set.rs | 29 +++++++++++++++-------------- meta_client/src/types.rs | 36 ++++++++++++++++++++++++++++++++++++ router/src/cluster_based.rs | 1 + 5 files changed, 55 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3ccfe5db0e..0f12762d13 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1108,9 +1108,9 @@ dependencies = [ [[package]] name = "ceresdbproto" -version = "1.0.7" +version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff289995b93a0f20dd77342a2ac85447a68e62c03b56704b588630c4d98b08d3" +checksum = "25926e49d9d931b3089b26aba55cd5057631db452137f45d0d24f8b5dae8a28c" dependencies = [ "prost", "protoc-bin-vendored", diff --git a/cluster/src/cluster_impl.rs b/cluster/src/cluster_impl.rs index 9ff0295cea..f533a2b937 100644 --- a/cluster/src/cluster_impl.rs +++ b/cluster/src/cluster_impl.rs @@ -102,7 +102,7 @@ impl ClusterImpl { loop { let shard_infos = inner .shard_set - .all_opened_shards() + .all_shards() .iter() .map(|shard| shard.shard_info()) .collect(); diff --git a/cluster/src/shard_set.rs b/cluster/src/shard_set.rs index bbbc3cc4aa..8977453e1d 100644 --- a/cluster/src/shard_set.rs +++ b/cluster/src/shard_set.rs @@ -2,7 +2,7 @@ use std::{collections::HashMap, sync::Arc}; -use meta_client::types::{ShardId, ShardInfo, TableInfo, TablesOfShard}; +use meta_client::types::{self, ShardId, ShardInfo, TableInfo, TablesOfShard}; use snafu::{ensure, OptionExt}; use crate::{ @@ -29,16 +29,6 @@ impl ShardSet { inner.values().cloned().collect() } - // Fetch all opened shards. - pub fn all_opened_shards(&self) -> Vec { - let inner = self.inner.read().unwrap(); - inner - .values() - .filter(|shard| shard.is_opened()) - .cloned() - .collect() - } - // Get the shard by its id. pub fn get(&self, shard_id: ShardId) -> Option { let inner = self.inner.read().unwrap(); @@ -87,7 +77,10 @@ impl Shard { pub fn shard_info(&self) -> ShardInfo { let data = self.data.read().unwrap(); - data.shard_info.clone() + ShardInfo { + status: Some(data.status.into()), + ..data.shard_info + } } pub fn find_table(&self, schema_name: &str, table_name: &str) -> Option { @@ -178,7 +171,7 @@ pub struct UpdatedTableInfo { /// When a open request comes in, shard can only be opened when it's in /// - `Init`, which means it has not been opened before. /// - `Opening`, which means it has been opened before, but failed. -#[derive(Debug, Default, PartialEq)] +#[derive(Copy, Clone, Debug, Default, PartialEq)] pub enum ShardStatus { /// Not allowed report to ceresmeta #[default] @@ -191,6 +184,15 @@ pub enum ShardStatus { Frozen, } +impl From for types::ShardStatus { + fn from(value: ShardStatus) -> Self { + match value { + ShardStatus::Init | ShardStatus::Opening => types::ShardStatus::PartialOpen, + ShardStatus::Ready | ShardStatus::Frozen => types::ShardStatus::Ready, + } + } +} + /// Shard data #[derive(Debug)] pub struct ShardData { @@ -201,7 +203,6 @@ pub struct ShardData { pub tables: Vec, /// Current status - /// The flow of shard status is: opening -> opened -> frozen pub status: ShardStatus, } diff --git a/meta_client/src/types.rs b/meta_client/src/types.rs index b47a5325d4..9c75ade2e2 100644 --- a/meta_client/src/types.rs +++ b/meta_client/src/types.rs @@ -162,11 +162,23 @@ pub struct NodeInfo { pub shard_infos: Vec, } +#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize)] +pub enum ShardStatus { + #[default] + Ready = 0, + PartialOpen, +} + #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize)] pub struct ShardInfo { pub id: ShardId, pub role: ShardRole, pub version: ShardVersion, + // This status is only used for request ceresdb send to ceresmeta via heartbeat + // When ceresdb receive this via open shard request, this field is meanless. + // TODO: a better way to fix this to separate request and response between ceresdb and + // ceresmeta. + pub status: Option, } impl ShardInfo { @@ -235,6 +247,7 @@ impl From for meta_service_pb::ShardInfo { id: shard_info.id, role: role as i32, version: shard_info.version, + status: shard_info.status.map(|v| v as i32), } } } @@ -245,6 +258,29 @@ impl From<&meta_service_pb::ShardInfo> for ShardInfo { id: pb_shard_info.id, role: pb_shard_info.role().into(), version: pb_shard_info.version, + status: pb_shard_info.status.map(|v| { + meta_service_pb::shard_info::Status::from_i32(v) + .unwrap_or_default() + .into() + }), + } + } +} + +impl From for meta_service_pb::shard_info::Status { + fn from(v: ShardStatus) -> Self { + match v { + ShardStatus::Ready => meta_service_pb::shard_info::Status::Ready, + ShardStatus::PartialOpen => meta_service_pb::shard_info::Status::PartialOpen, + } + } +} + +impl From for ShardStatus { + fn from(v: meta_service_pb::shard_info::Status) -> Self { + match v { + meta_service_pb::shard_info::Status::Ready => ShardStatus::Ready, + meta_service_pb::shard_info::Status::PartialOpen => ShardStatus::PartialOpen, } } } diff --git a/router/src/cluster_based.rs b/router/src/cluster_based.rs index b9b8e73b1f..de2ee187c0 100644 --- a/router/src/cluster_based.rs +++ b/router/src/cluster_based.rs @@ -224,6 +224,7 @@ mod tests { id: 0, role: Leader, version: 100, + status: None, }, }], }, From 92a6500c0220256341eeedb89ff7cf992f870ccc Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Tue, 18 Jul 2023 22:29:41 +0800 Subject: [PATCH 2/5] refactor open shard --- cluster/src/shard_set.rs | 83 +++++++---------------- meta_client/src/types.rs | 66 ++++++++++-------- router/src/cluster_based.rs | 2 +- server/src/grpc/meta_event_service/mod.rs | 2 +- 4 files changed, 63 insertions(+), 90 deletions(-) diff --git a/cluster/src/shard_set.rs b/cluster/src/shard_set.rs index 8977453e1d..a13c5628b2 100644 --- a/cluster/src/shard_set.rs +++ b/cluster/src/shard_set.rs @@ -2,16 +2,17 @@ use std::{collections::HashMap, sync::Arc}; -use meta_client::types::{self, ShardId, ShardInfo, TableInfo, TablesOfShard}; -use snafu::{ensure, OptionExt}; +use generic_error::BoxError; +use meta_client::types::{ShardId, ShardInfo, ShardStatus, TableInfo, TablesOfShard}; +use snafu::{ensure, OptionExt, ResultExt}; use crate::{ shard_operator::{ CloseContext, CloseTableContext, CreateTableContext, DropTableContext, OpenContext, OpenTableContext, ShardOperator, }, - OpenShardNoCause, Result, ShardVersionMismatch, TableAlreadyExists, TableNotFound, - UpdateFrozenShard, + OpenShardNoCause, OpenShardWithCause, Result, ShardVersionMismatch, TableAlreadyExists, + TableNotFound, UpdateFrozenShard, }; /// Shard set @@ -67,7 +68,6 @@ impl Shard { let data = Arc::new(std::sync::RwLock::new(ShardData { shard_info: tables_of_shard.shard_info, tables: tables_of_shard.tables, - status: ShardStatus::default(), })); let operator = tokio::sync::Mutex::new(ShardOperator { data: data.clone() }); @@ -77,10 +77,8 @@ impl Shard { pub fn shard_info(&self) -> ShardInfo { let data = self.data.read().unwrap(); - ShardInfo { - status: Some(data.status.into()), - ..data.shard_info - } + + data.shard_info.clone() } pub fn find_table(&self, schema_name: &str, table_name: &str) -> Option { @@ -89,12 +87,19 @@ impl Shard { } pub async fn open(&self, ctx: OpenContext) -> Result<()> { - let operator = self.operator.lock().await; + let operator = self + .operator + .try_lock() + .box_err() + .context(OpenShardWithCause { + msg: "Failed to get shard operator lock", + })?; + { let mut data = self.data.write().unwrap(); if !data.need_open() { return OpenShardNoCause { - msg: format!("Shard is already in opening, id:{}", data.shard_info.id), + msg: "Shard is already in opening", } .fail(); } @@ -154,45 +159,6 @@ pub struct UpdatedTableInfo { pub table_info: TableInfo, } -/// The status changes of a shard as following: -/// -///```plaintext -/// ┌────┐ -/// │Init│ -/// └──┬─┘ -/// ___▽___ -/// ╱ ╲ ┌─────┐ -/// ╱ Opening ╲____│Ready│ -/// ╲ ╱yes └──┬──┘ -/// ╲_______╱ ┌───▽──┐ -/// │Frozen│ -/// └──────┘ -/// ``` -/// When a open request comes in, shard can only be opened when it's in -/// - `Init`, which means it has not been opened before. -/// - `Opening`, which means it has been opened before, but failed. -#[derive(Copy, Clone, Debug, Default, PartialEq)] -pub enum ShardStatus { - /// Not allowed report to ceresmeta - #[default] - Init, - /// Not allowed report to ceresmeta - Opening, - /// Healthy - Ready, - /// Further updates are prohibited - Frozen, -} - -impl From for types::ShardStatus { - fn from(value: ShardStatus) -> Self { - match value { - ShardStatus::Init | ShardStatus::Opening => types::ShardStatus::PartialOpen, - ShardStatus::Ready | ShardStatus::Frozen => types::ShardStatus::Ready, - } - } -} - /// Shard data #[derive(Debug)] pub struct ShardData { @@ -201,9 +167,6 @@ pub struct ShardData { /// Tables in shard pub tables: Vec, - - /// Current status - pub status: ShardStatus, } impl ShardData { @@ -216,19 +179,19 @@ impl ShardData { #[inline] pub fn freeze(&mut self) { - self.status = ShardStatus::Frozen; + self.shard_info.status = ShardStatus::Frozen; } #[inline] pub fn begin_open(&mut self) { - self.status = ShardStatus::Opening; + self.shard_info.status = ShardStatus::Opening; } #[inline] pub fn finish_open(&mut self) { - assert_eq!(self.status, ShardStatus::Opening); + assert_eq!(self.shard_info.status, ShardStatus::Opening); - self.status = ShardStatus::Ready; + self.shard_info.status = ShardStatus::Ready; } #[inline] @@ -238,12 +201,12 @@ impl ShardData { #[inline] pub fn is_opened(&self) -> bool { - matches!(self.status, ShardStatus::Ready | ShardStatus::Frozen) + self.shard_info.is_opened() } #[inline] fn is_frozen(&self) -> bool { - matches!(self.status, ShardStatus::Frozen) + matches!(self.shard_info.status, ShardStatus::Frozen) } pub fn try_insert_table(&mut self, updated_info: UpdatedTableInfo) -> Result<()> { @@ -291,7 +254,7 @@ impl ShardData { } = updated_info; ensure!( - !matches!(self.status, ShardStatus::Frozen), + !self.is_frozen(), UpdateFrozenShard { shard_id: curr_shard.id, } diff --git a/meta_client/src/types.rs b/meta_client/src/types.rs index 9c75ade2e2..dcccbb95b8 100644 --- a/meta_client/src/types.rs +++ b/meta_client/src/types.rs @@ -162,11 +162,34 @@ pub struct NodeInfo { pub shard_infos: Vec, } +/// The status changes of a shard as following: +/// +///```plaintext +/// ┌────┐ +/// │Init│ +/// └──┬─┘ +/// ___▽___ +/// ╱ ╲ ┌─────┐ +/// ╱ Opening ╲____│Ready│ +/// ╲ ╱yes └──┬──┘ +/// ╲_______╱ ┌───▽──┐ +/// │Frozen│ +/// └──────┘ +/// ``` +/// When an open request comes in, shard can only be opened when it's in +/// - `Init`, which means it has not been opened before. +/// - `Opening`, which means it has been opened before, but failed. #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize)] pub enum ShardStatus { + /// Not allowed report to ceresmeta #[default] - Ready = 0, - PartialOpen, + Init, + /// Not allowed report to ceresmeta + Opening, + /// Healthy + Ready, + /// Further updates are prohibited + Frozen, } #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize)] @@ -176,9 +199,9 @@ pub struct ShardInfo { pub version: ShardVersion, // This status is only used for request ceresdb send to ceresmeta via heartbeat // When ceresdb receive this via open shard request, this field is meanless. - // TODO: a better way to fix this to separate request and response between ceresdb and + // TODO: Use different request and response body between ceresdb and // ceresmeta. - pub status: Option, + pub status: ShardStatus, } impl ShardInfo { @@ -186,6 +209,11 @@ impl ShardInfo { pub fn is_leader(&self) -> bool { self.role == ShardRole::Leader } + + #[inline] + pub fn is_opened(&self) -> bool { + matches!(self.status, ShardStatus::Ready | ShardStatus::Frozen) + } } #[derive(Debug, Default, Copy, Clone, Eq, PartialEq, Serialize)] @@ -247,7 +275,11 @@ impl From for meta_service_pb::ShardInfo { id: shard_info.id, role: role as i32, version: shard_info.version, - status: shard_info.status.map(|v| v as i32), + status: Some(if shard_info.is_opened() { + meta_service_pb::shard_info::Status::Ready + } else { + meta_service_pb::shard_info::Status::PartialOpen + } as i32), } } } @@ -258,29 +290,7 @@ impl From<&meta_service_pb::ShardInfo> for ShardInfo { id: pb_shard_info.id, role: pb_shard_info.role().into(), version: pb_shard_info.version, - status: pb_shard_info.status.map(|v| { - meta_service_pb::shard_info::Status::from_i32(v) - .unwrap_or_default() - .into() - }), - } - } -} - -impl From for meta_service_pb::shard_info::Status { - fn from(v: ShardStatus) -> Self { - match v { - ShardStatus::Ready => meta_service_pb::shard_info::Status::Ready, - ShardStatus::PartialOpen => meta_service_pb::shard_info::Status::PartialOpen, - } - } -} - -impl From for ShardStatus { - fn from(v: meta_service_pb::shard_info::Status) -> Self { - match v { - meta_service_pb::shard_info::Status::Ready => ShardStatus::Ready, - meta_service_pb::shard_info::Status::PartialOpen => ShardStatus::PartialOpen, + status: Default::default(), } } } diff --git a/router/src/cluster_based.rs b/router/src/cluster_based.rs index de2ee187c0..15b9a0e306 100644 --- a/router/src/cluster_based.rs +++ b/router/src/cluster_based.rs @@ -224,7 +224,7 @@ mod tests { id: 0, role: Leader, version: 100, - status: None, + status: Default::default(), }, }], }, diff --git a/server/src/grpc/meta_event_service/mod.rs b/server/src/grpc/meta_event_service/mod.rs index 69f76c49a6..b7f3bfe891 100644 --- a/server/src/grpc/meta_event_service/mod.rs +++ b/server/src/grpc/meta_event_service/mod.rs @@ -299,7 +299,7 @@ async fn do_open_shard(ctx: HandlerContext, shard_info: ShardInfo) -> Result<()> shard.open(open_ctx).await.box_err().context(ErrWithCause { code: StatusCode::Internal, - msg: "fail to open shard", + msg: format!("fail to open shard, id:{}", shard_info.id), }) } From abfc4d230c9868c6cb5f0c6ad74f04c877cc9e5b Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Tue, 18 Jul 2023 22:33:08 +0800 Subject: [PATCH 3/5] fix comments --- meta_client/src/types.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/meta_client/src/types.rs b/meta_client/src/types.rs index dcccbb95b8..fc0e1b3c5c 100644 --- a/meta_client/src/types.rs +++ b/meta_client/src/types.rs @@ -181,10 +181,10 @@ pub struct NodeInfo { /// - `Opening`, which means it has been opened before, but failed. #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize)] pub enum ShardStatus { - /// Not allowed report to ceresmeta + /// Created, but not opened #[default] Init, - /// Not allowed report to ceresmeta + /// In opening Opening, /// Healthy Ready, From be9e5dbe8061de1e076638c0a545b25abdb165c3 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Tue, 18 Jul 2023 22:44:07 +0800 Subject: [PATCH 4/5] flush after table open successfully --- catalog/src/table_operator.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/catalog/src/table_operator.rs b/catalog/src/table_operator.rs index d162168224..d7fdb27d9f 100644 --- a/catalog/src/table_operator.rs +++ b/catalog/src/table_operator.rs @@ -5,7 +5,10 @@ use std::time::Instant; use generic_error::BoxError; use log::{error, info, warn}; use snafu::{OptionExt, ResultExt}; -use table_engine::{engine, table::TableRef}; +use table_engine::{ + engine, + table::{FlushRequest, TableRef}, +}; use time_ext::InstantExt; use crate::{ @@ -74,6 +77,12 @@ impl TableOperator { match table_result { Ok(Some(table)) => { + // When table open successfully, try flush to reduce WAL size, so when reopen + // this shard the WAL required to fetch is reduced. + if let Err(e) = table.flush(FlushRequest { sync: false }).await { + warn!("Try flush after open table failed, err:{e}"); + } + schema.register_table(table); success_count += 1; } From 350daaa3e61d9e9f6020f4670e0a81d2d5189f6a Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Wed, 19 Jul 2023 18:15:46 +0800 Subject: [PATCH 5/5] remove flush --- catalog/src/table_operator.rs | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/catalog/src/table_operator.rs b/catalog/src/table_operator.rs index d7fdb27d9f..d162168224 100644 --- a/catalog/src/table_operator.rs +++ b/catalog/src/table_operator.rs @@ -5,10 +5,7 @@ use std::time::Instant; use generic_error::BoxError; use log::{error, info, warn}; use snafu::{OptionExt, ResultExt}; -use table_engine::{ - engine, - table::{FlushRequest, TableRef}, -}; +use table_engine::{engine, table::TableRef}; use time_ext::InstantExt; use crate::{ @@ -77,12 +74,6 @@ impl TableOperator { match table_result { Ok(Some(table)) => { - // When table open successfully, try flush to reduce WAL size, so when reopen - // this shard the WAL required to fetch is reduced. - if let Err(e) = table.flush(FlushRequest { sync: false }).await { - warn!("Try flush after open table failed, err:{e}"); - } - schema.register_table(table); success_count += 1; }