Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: failure to open a single table does not interrupt the shard's opening process #722

Merged
merged 11 commits into from
Mar 15, 2023
55 changes: 40 additions & 15 deletions server/src/grpc/meta_event_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

// Meta event rpc service implementation.

use std::{sync::Arc, time::Instant};
use std::{collections::HashMap, sync::Arc, time::Instant};

use analytic_engine::setup::OpenedWals;
use async_trait::async_trait;
Expand Down Expand Up @@ -41,7 +41,7 @@ use self::shard_operation::WalCloserAdapter;
use crate::{
grpc::{
meta_event_service::{
error::{ErrNoCause, ErrWithCause, Result, StatusCode},
error::{ErrNoCause, ErrWithCause, Error, Result, StatusCode},
shard_operation::WalRegionCloserRef,
},
metrics::META_EVENT_GRPC_HANDLER_DURATION_HISTOGRAM_VEC,
Expand Down Expand Up @@ -210,6 +210,7 @@ impl HandlerContext {
// implementation.

async fn handle_open_shard(ctx: HandlerContext, request: OpenShardRequest) -> Result<()> {
let instant = Instant::now();
let tables_of_shard =
ctx.cluster
.open_shard(&request)
Expand All @@ -236,6 +237,9 @@ async fn handle_open_shard(ctx: HandlerContext, request: OpenShardRequest) -> Re
table_engine: ctx.table_engine,
};

let mut success = 0;
let mut err_map = HashMap::new();

for table in tables_of_shard.tables {
let schema = find_schema(default_catalog.clone(), &table.schema_name)?;

Expand All @@ -249,21 +253,42 @@ async fn handle_open_shard(ctx: HandlerContext, request: OpenShardRequest) -> Re
shard_id: shard_info.id,
cluster_version: topology.cluster_topology_version,
};
schema
.open_table(open_request.clone(), opts.clone())
.await
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::Internal,
msg: format!("fail to open table, open_request:{open_request:?}"),
})?
.with_context(|| ErrNoCause {
code: StatusCode::Internal,
msg: format!("no table is opened, open_request:{open_request:?}"),
})?;
let result = schema.open_table(open_request.clone(), opts.clone()).await;

match result {
Ok(Some(_)) => {
success += 1;
}
Ok(None) => {
MichaelLeeHZ marked this conversation as resolved.
Show resolved Hide resolved
error!("no table is opened, open_request:{open_request:?}");
err_map.insert(table.name, "no table is opened");
}
Err(e) => {
MichaelLeeHZ marked this conversation as resolved.
Show resolved Hide resolved
error!("fail to open table, open_request:{open_request:?}, err:{e}");
err_map.insert(table.name, "fail to open table");
}
};
}

Ok(())
info!(
"Open shard finish, shard id:{}, cost:{}ms, successful tables:{}, failed tables:{}",
shard_info.id,
instant.saturating_elapsed().as_millis(),
success,
err_map.len(),
);

if err_map.is_empty() {
Ok(())
} else {
Err(Error::ErrNoCause {
code: StatusCode::Internal,
msg: format!(
"Failed to open shard:{}, because of failed tables:{err_map:?}",
MichaelLeeHZ marked this conversation as resolved.
Show resolved Hide resolved
shard_info.id
),
})
}
}

async fn handle_close_shard(ctx: HandlerContext, request: CloseShardRequest) -> Result<()> {
Expand Down