Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: failure to open a single table does not interrupt the shard's opening process #722

Merged
merged 11 commits into from
Mar 15, 2023
55 changes: 41 additions & 14 deletions server/src/grpc/meta_event_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use self::shard_operation::WalCloserAdapter;
use crate::{
grpc::{
meta_event_service::{
error::{ErrNoCause, ErrWithCause, Result, StatusCode},
error::{ErrNoCause, ErrWithCause, Error, Result, StatusCode},
shard_operation::WalRegionCloserRef,
},
metrics::META_EVENT_GRPC_HANDLER_DURATION_HISTOGRAM_VEC,
Expand Down Expand Up @@ -210,6 +210,7 @@ impl HandlerContext {
// implementation.

async fn handle_open_shard(ctx: HandlerContext, request: OpenShardRequest) -> Result<()> {
let instant = Instant::now();
let tables_of_shard =
ctx.cluster
.open_shard(&request)
Expand All @@ -236,6 +237,10 @@ async fn handle_open_shard(ctx: HandlerContext, request: OpenShardRequest) -> Re
table_engine: ctx.table_engine,
};

let mut success = 0;
let mut no_table_count = 0;
let mut open_err_count = 0;

for table in tables_of_shard.tables {
let schema = find_schema(default_catalog.clone(), &table.schema_name)?;

Expand All @@ -249,21 +254,43 @@ async fn handle_open_shard(ctx: HandlerContext, request: OpenShardRequest) -> Re
shard_id: shard_info.id,
cluster_version: topology.cluster_topology_version,
};
schema
.open_table(open_request.clone(), opts.clone())
.await
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::Internal,
msg: format!("fail to open table, open_request:{open_request:?}"),
})?
.with_context(|| ErrNoCause {
code: StatusCode::Internal,
msg: format!("no table is opened, open_request:{open_request:?}"),
})?;
let result = schema.open_table(open_request.clone(), opts.clone()).await;

match result {
Ok(Some(_)) => {
success += 1;
}
Ok(None) => {
MichaelLeeHZ marked this conversation as resolved.
Show resolved Hide resolved
no_table_count += 1;
error!("no table is opened, open_request:{open_request:?}");
}
Err(e) => {
MichaelLeeHZ marked this conversation as resolved.
Show resolved Hide resolved
open_err_count += 1;
error!("fail to open table, open_request:{open_request:?}, err:{e}");
}
};
}

Ok(())
info!(
"Open shard finish, shard id:{}, cost:{}ms, successful count:{}, no table is opened count:{}, open error count:{}",
shard_info.id,
instant.saturating_elapsed().as_millis(),
success,
no_table_count,
open_err_count
);

if no_table_count == 0 && open_err_count == 0 {
Ok(())
} else {
Err(Error::ErrNoCause {
code: StatusCode::Internal,
msg: format!(
"Failed to open shard:{}, some tables open failed, no table is opened count:{}, open error count:{}",
shard_info.id, no_table_count, open_err_count
),
})
}
}

async fn handle_close_shard(ctx: HandlerContext, request: CloseShardRequest) -> Result<()> {
Expand Down