Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add shard status when heartbeat to meta #1082

Merged
merged 5 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion catalog/src/table_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use std::time::Instant;
use generic_error::BoxError;
use log::{error, info, warn};
use snafu::{OptionExt, ResultExt};
use table_engine::{engine, table::TableRef};
use table_engine::{
engine,
table::{FlushRequest, TableRef},
};
use time_ext::InstantExt;

use crate::{
Expand Down Expand Up @@ -74,6 +77,12 @@ impl TableOperator {

match table_result {
Ok(Some(table)) => {
// When table open successfully, try flush to reduce WAL size, so when reopen
// this shard the WAL required to fetch is reduced.
if let Err(e) = table.flush(FlushRequest { sync: false }).await {
warn!("Try flush after open table failed, err:{e}");
}
Rachelint marked this conversation as resolved.
Show resolved Hide resolved

schema.register_table(table);
success_count += 1;
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/src/cluster_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl ClusterImpl {
loop {
let shard_infos = inner
.shard_set
.all_opened_shards()
.all_shards()
.iter()
.map(|shard| shard.shard_info())
.collect();
Expand Down
80 changes: 22 additions & 58 deletions cluster/src/shard_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@

use std::{collections::HashMap, sync::Arc};

use meta_client::types::{ShardId, ShardInfo, TableInfo, TablesOfShard};
use snafu::{ensure, OptionExt};
use generic_error::BoxError;
use meta_client::types::{ShardId, ShardInfo, ShardStatus, TableInfo, TablesOfShard};
use snafu::{ensure, OptionExt, ResultExt};

use crate::{
shard_operator::{
CloseContext, CloseTableContext, CreateTableContext, DropTableContext, OpenContext,
OpenTableContext, ShardOperator,
},
OpenShardNoCause, Result, ShardVersionMismatch, TableAlreadyExists, TableNotFound,
UpdateFrozenShard,
OpenShardNoCause, OpenShardWithCause, Result, ShardVersionMismatch, TableAlreadyExists,
TableNotFound, UpdateFrozenShard,
};

/// Shard set
Expand All @@ -29,16 +30,6 @@ impl ShardSet {
inner.values().cloned().collect()
}

// Fetch all opened shards.
pub fn all_opened_shards(&self) -> Vec<ShardRef> {
let inner = self.inner.read().unwrap();
inner
.values()
.filter(|shard| shard.is_opened())
.cloned()
.collect()
}

// Get the shard by its id.
pub fn get(&self, shard_id: ShardId) -> Option<ShardRef> {
let inner = self.inner.read().unwrap();
Expand Down Expand Up @@ -77,7 +68,6 @@ impl Shard {
let data = Arc::new(std::sync::RwLock::new(ShardData {
shard_info: tables_of_shard.shard_info,
tables: tables_of_shard.tables,
status: ShardStatus::default(),
}));

let operator = tokio::sync::Mutex::new(ShardOperator { data: data.clone() });
Expand All @@ -87,6 +77,7 @@ impl Shard {

pub fn shard_info(&self) -> ShardInfo {
let data = self.data.read().unwrap();

data.shard_info.clone()
}

Expand All @@ -96,12 +87,19 @@ impl Shard {
}

pub async fn open(&self, ctx: OpenContext) -> Result<()> {
let operator = self.operator.lock().await;
let operator = self
.operator
.try_lock()
.box_err()
.context(OpenShardWithCause {
msg: "Failed to get shard operator lock",
})?;

{
let mut data = self.data.write().unwrap();
if !data.need_open() {
return OpenShardNoCause {
msg: format!("Shard is already in opening, id:{}", data.shard_info.id),
msg: "Shard is already in opening",
}
.fail();
}
Expand Down Expand Up @@ -161,36 +159,6 @@ pub struct UpdatedTableInfo {
pub table_info: TableInfo,
}

/// The status changes of a shard as following:
///
///```plaintext
/// ┌────┐
/// │Init│
/// └──┬─┘
/// ___▽___
/// ╱ ╲ ┌─────┐
/// ╱ Opening ╲____│Ready│
/// ╲ ╱yes └──┬──┘
/// ╲_______╱ ┌───▽──┐
/// │Frozen│
/// └──────┘
/// ```
/// When a open request comes in, shard can only be opened when it's in
/// - `Init`, which means it has not been opened before.
/// - `Opening`, which means it has been opened before, but failed.
#[derive(Debug, Default, PartialEq)]
pub enum ShardStatus {
/// Not allowed report to ceresmeta
#[default]
Init,
/// Not allowed report to ceresmeta
Opening,
/// Healthy
Ready,
/// Further updates are prohibited
Frozen,
}

/// Shard data
#[derive(Debug)]
pub struct ShardData {
Expand All @@ -199,10 +167,6 @@ pub struct ShardData {

/// Tables in shard
pub tables: Vec<TableInfo>,

/// Current status
/// The flow of shard status is: opening -> opened -> frozen
pub status: ShardStatus,
}

impl ShardData {
Expand All @@ -215,19 +179,19 @@ impl ShardData {

#[inline]
pub fn freeze(&mut self) {
self.status = ShardStatus::Frozen;
self.shard_info.status = ShardStatus::Frozen;
}

#[inline]
pub fn begin_open(&mut self) {
self.status = ShardStatus::Opening;
self.shard_info.status = ShardStatus::Opening;
}

#[inline]
pub fn finish_open(&mut self) {
assert_eq!(self.status, ShardStatus::Opening);
assert_eq!(self.shard_info.status, ShardStatus::Opening);

self.status = ShardStatus::Ready;
self.shard_info.status = ShardStatus::Ready;
}

#[inline]
Expand All @@ -237,12 +201,12 @@ impl ShardData {

#[inline]
pub fn is_opened(&self) -> bool {
matches!(self.status, ShardStatus::Ready | ShardStatus::Frozen)
self.shard_info.is_opened()
}

#[inline]
fn is_frozen(&self) -> bool {
matches!(self.status, ShardStatus::Frozen)
matches!(self.shard_info.status, ShardStatus::Frozen)
}

pub fn try_insert_table(&mut self, updated_info: UpdatedTableInfo) -> Result<()> {
Expand Down Expand Up @@ -290,7 +254,7 @@ impl ShardData {
} = updated_info;

ensure!(
!matches!(self.status, ShardStatus::Frozen),
!self.is_frozen(),
UpdateFrozenShard {
shard_id: curr_shard.id,
}
Expand Down
46 changes: 46 additions & 0 deletions meta_client/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,18 +162,58 @@ pub struct NodeInfo {
pub shard_infos: Vec<ShardInfo>,
}

/// The status changes of a shard as following:
///
///```plaintext
/// ┌────┐
/// │Init│
/// └──┬─┘
/// ___▽___
/// ╱ ╲ ┌─────┐
/// ╱ Opening ╲____│Ready│
/// ╲ ╱yes └──┬──┘
/// ╲_______╱ ┌───▽──┐
/// │Frozen│
/// └──────┘
/// ```
/// When an open request comes in, shard can only be opened when it's in
/// - `Init`, which means it has not been opened before.
/// - `Opening`, which means it has been opened before, but failed.
#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize)]
pub enum ShardStatus {
/// Created, but not opened
#[default]
Init,
/// In opening
Opening,
/// Healthy
Ready,
/// Further updates are prohibited
Frozen,
}

#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize)]
pub struct ShardInfo {
Rachelint marked this conversation as resolved.
Show resolved Hide resolved
pub id: ShardId,
pub role: ShardRole,
pub version: ShardVersion,
// This status is only used for request ceresdb send to ceresmeta via heartbeat
// When ceresdb receive this via open shard request, this field is meanless.
// TODO: Use different request and response body between ceresdb and
// ceresmeta.
pub status: ShardStatus,
}

impl ShardInfo {
#[inline]
pub fn is_leader(&self) -> bool {
self.role == ShardRole::Leader
}

#[inline]
pub fn is_opened(&self) -> bool {
matches!(self.status, ShardStatus::Ready | ShardStatus::Frozen)
}
Rachelint marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(Debug, Default, Copy, Clone, Eq, PartialEq, Serialize)]
Expand Down Expand Up @@ -235,6 +275,11 @@ impl From<ShardInfo> for meta_service_pb::ShardInfo {
id: shard_info.id,
role: role as i32,
version: shard_info.version,
status: Some(if shard_info.is_opened() {
meta_service_pb::shard_info::Status::Ready
} else {
meta_service_pb::shard_info::Status::PartialOpen
} as i32),
}
}
}
Expand All @@ -245,6 +290,7 @@ impl From<&meta_service_pb::ShardInfo> for ShardInfo {
id: pb_shard_info.id,
role: pb_shard_info.role().into(),
version: pb_shard_info.version,
status: Default::default(),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions router/src/cluster_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ mod tests {
id: 0,
role: Leader,
version: 100,
status: Default::default(),
},
}],
},
Expand Down
2 changes: 1 addition & 1 deletion server/src/grpc/meta_event_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ async fn do_open_shard(ctx: HandlerContext, shard_info: ShardInfo) -> Result<()>

shard.open(open_ctx).await.box_err().context(ErrWithCause {
code: StatusCode::Internal,
msg: "fail to open shard",
msg: format!("fail to open shard, id:{}", shard_info.id),
})
}

Expand Down